Sha256: d3d9fc619d1e5d91d9febd0eec50c35adf725ef25aefd600d3ffa56e22baf215

Contents?: true

Size: 1.87 KB

Versions: 1

Compression:

Stored size: 1.87 KB

Contents

require 'socket'
require 'json'

require_relative 'event_loop'

module WhirledPeas
  module Frame
    class Consumer
      LOGGER_ID = 'CONSUMER'

      def initialize(template_factory, refresh_rate, logger=NullLogger.new)
        @event_loop = EventLoop.new(template_factory, refresh_rate, logger)
        @logger = logger
        @running = false
        @mutex = Mutex.new
      end

      def start(host:, port:)
        mutex.synchronize { @running = true }
        loop_thread = Thread.new do
          Thread.current.report_on_exception = false
          event_loop.start
        end
        socket = TCPSocket.new(host, port)
        logger.info(LOGGER_ID) { "Connected to #{host}:#{port}" }
        while @running && event_loop.running?
          line = socket.gets
          if line.nil?
            sleep(0.001)
            next
          end
          args = JSON.parse(line)
          name = args.delete('name')
          if [Frame::EOF, Frame::TERMINATE].include?(name)
            logger.info(LOGGER_ID) { "Received #{name} event, stopping..." }
            event_loop.stop if name == Frame::TERMINATE
            @running = false
          else
            duration = args.delete('duration')
            event_loop.enqueue(name, duration, args)
          end
        end
        logger.info(LOGGER_ID) { 'Exited normally' }
        logger.info(LOGGER_ID) { 'Waiting for loop thread to exit' }
        loop_thread.join
      rescue => e
        event_loop.stop if event_loop.running?
        logger.warn(LOGGER_ID) { 'Exited with error' }
        logger.error(LOGGER_ID) { e }
        raise
      ensure
        logger.info(LOGGER_ID) { 'Closing socket' }
        socket.close if socket
      end

      def stop
        logger.info(LOGGER_ID) { 'Stopping...' }
        mutex.synchronize { @running = false }
      end

      private

      attr_reader :event_loop, :logger, :mutex
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
whirled_peas-0.2.0 lib/whirled_peas/frame/consumer.rb