lib/whirled_peas/frame/producer.rb in whirled_peas-0.2.0 vs lib/whirled_peas/frame/producer.rb in whirled_peas-0.3.0

- old
+ new

@@ -4,60 +4,47 @@ module WhirledPeas module Frame class Producer LOGGER_ID = 'PRODUCER' - def self.start(logger: NullLogger.new, host:, port:, &block) - server = TCPServer.new(host, port) - client = server.accept - logger.info(LOGGER_ID) { "Connected to #{host}:#{port}" } - producer = new(client, logger) + def self.produce(event_loop:, logger: NullLogger.new) + producer = new(event_loop, logger) + logger.info(LOGGER_ID) { 'Starting' } yield producer + logger.info(LOGGER_ID) { 'Done with yield' } + producer.send_frame(Frame::EOF) logger.info(LOGGER_ID) { 'Exited normally' } rescue => e - producer.terminate + producer.send_frame(Frame::TERMINATE) logger.warn(LOGGER_ID) { 'Exited with error' } logger.error(LOGGER_ID) { e } raise - ensure - if client - logger.info(LOGGER_ID) { 'Closing connection'} - client.close - end end - def initialize(client, logger=NullLogger.new) - @client = client + def initialize(event_loop, logger=NullLogger.new) + @event_loop = event_loop @logger = logger @queue = Queue.new end - def send(name, duration: nil, args: {}) - client.puts(JSON.generate('name' => name, 'duration' => duration, **args)) + def send_frame(name, duration: nil, args: {}) + event_loop.enqueue(name: name, duration: duration, args: args) logger.debug(LOGGER_ID) { "Sending frame: #{name}" } end - def enqueue(name, duration: nil, args: {}) + def enqueue_frame(name, duration: nil, args: {}) queue.push([name, duration, args]) end def flush while !queue.empty? name, duration, args = queue.pop - send(name, duration: duration, args: args) + send_frame(name: name, duration: duration, args: args) end end - def stop - send(Frame::EOF) - end - - def terminate - send(Frame::TERMINATE) - end - private - attr_reader :client, :logger, :queue + attr_reader :event_loop, :logger end end end