Sha256: 6ab863d29625c58b06ea809bf18d6112d6094f36f526d805834846e000766ddd

Contents?: true

Size: 1.74 KB

Versions: 1

Compression:

Stored size: 1.74 KB

Contents

require 'socket'
require 'json'

require_relative 'loop'

module WhirledPeas
  module Frame
    class Consumer
      def initialize(template_factory, refresh_rate, logger=NullLogger.new)
        @loop = Loop.new(template_factory, refresh_rate, logger)
        @logger = logger
        @running = false
        @mutex = Mutex.new
      end

      def start(host:, port:)
        mutex.synchronize { @running = true }
        loop_thread = Thread.new { loop.start }
        socket = TCPSocket.new(host, port)
        logger.info('CONSUMER') { "Connected to #{host}:#{port}" }
        while @running
          line = socket.gets
          if line.nil?
            sleep(0.001)
            next
          end
          args = JSON.parse(line)
          name = args.delete('name')
          if [Frame::EOF, Frame::TERMINATE].include?(name)
            logger.info('CONSUMER') { "Received #{name} event, stopping..." }
            loop.stop if name == Frame::TERMINATE
            @running = false
          else
            duration = args.delete('duration')
            loop.enqueue(name, duration, args)
          end
        end
        logger.info('CONSUMER') { "Exited normally" }
      rescue => e
        logger.warn('CONSUMER') { "Exited with error" }
        logger.error('CONSUMER') { e.message }
        logger.error('CONSUMER') { e.backtrace.join("\n") }
        loop.stop
      ensure
        logger.info('CONSUMER') { "Waiting for loop thread to exit" }
        loop_thread.join
        logger.info('CONSUMER') { "Closing socket" }
        socket.close if socket
      end

      def stop
        logger.info('CONSUMER') { "Stopping..." }
        mutex.synchronize { @running = false }
      end

      private

      attr_reader :loop, :logger, :mutex
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
whirled_peas-0.1.0 lib/whirled_peas/frame/consumer.rb