lib/whirled_peas/frame/producer.rb in whirled_peas-0.2.0 vs lib/whirled_peas/frame/producer.rb in whirled_peas-0.3.0
- old
+ new
@@ -4,60 +4,47 @@
module WhirledPeas
module Frame
class Producer
LOGGER_ID = 'PRODUCER'
- def self.start(logger: NullLogger.new, host:, port:, &block)
- server = TCPServer.new(host, port)
- client = server.accept
- logger.info(LOGGER_ID) { "Connected to #{host}:#{port}" }
- producer = new(client, logger)
+ def self.produce(event_loop:, logger: NullLogger.new)
+ producer = new(event_loop, logger)
+ logger.info(LOGGER_ID) { 'Starting' }
yield producer
+ logger.info(LOGGER_ID) { 'Done with yield' }
+ producer.send_frame(Frame::EOF)
logger.info(LOGGER_ID) { 'Exited normally' }
rescue => e
- producer.terminate
+ producer.send_frame(Frame::TERMINATE)
logger.warn(LOGGER_ID) { 'Exited with error' }
logger.error(LOGGER_ID) { e }
raise
- ensure
- if client
- logger.info(LOGGER_ID) { 'Closing connection'}
- client.close
- end
end
- def initialize(client, logger=NullLogger.new)
- @client = client
+ def initialize(event_loop, logger=NullLogger.new)
+ @event_loop = event_loop
@logger = logger
@queue = Queue.new
end
- def send(name, duration: nil, args: {})
- client.puts(JSON.generate('name' => name, 'duration' => duration, **args))
+ def send_frame(name, duration: nil, args: {})
+ event_loop.enqueue(name: name, duration: duration, args: args)
logger.debug(LOGGER_ID) { "Sending frame: #{name}" }
end
- def enqueue(name, duration: nil, args: {})
+ def enqueue_frame(name, duration: nil, args: {})
queue.push([name, duration, args])
end
def flush
while !queue.empty?
name, duration, args = queue.pop
- send(name, duration: duration, args: args)
+ send_frame(name: name, duration: duration, args: args)
end
end
- def stop
- send(Frame::EOF)
- end
-
- def terminate
- send(Frame::TERMINATE)
- end
-
private
- attr_reader :client, :logger, :queue
+ attr_reader :event_loop, :logger
end
end
end