lib/whirled_peas/frame/producer.rb in whirled_peas-0.1.0 vs lib/whirled_peas/frame/producer.rb in whirled_peas-0.1.1
- old
+ new
@@ -2,33 +2,40 @@
require 'json'
module WhirledPeas
module Frame
class Producer
+ LOGGER_ID = 'PRODUCER'
+
def self.start(logger: NullLogger.new, host:, port:, &block)
server = TCPServer.new(host, port)
client = server.accept
- logger.info('PRODUCER') { "Connected to #{host}:#{port}" }
+ logger.info(LOGGER_ID) { "Connected to #{host}:#{port}" }
producer = new(client, logger)
yield producer
- logger.info('PRODUCER') { "Exited normally" }
+ logger.info(LOGGER_ID) { 'Exited normally' }
rescue => e
- logger.warn('PRODUCER') { "Exited with error" }
- logger.error('PRODUCER') { e.message }
- logger.error('PRODUCER') { e.backtrace.join("\n") }
+ producer.terminate
+ logger.warn(LOGGER_ID) { 'Exited with error' }
+ logger.error(LOGGER_ID) { e.message }
+ logger.error(LOGGER_ID) { e.backtrace.join("\n") }
+ raise
ensure
- client.close if client
+ if client
+ logger.info(LOGGER_ID) { 'Closing connection'}
+ client.close
+ end
end
def initialize(client, logger=NullLogger.new)
@client = client
@logger = logger
@queue = Queue.new
end
def send(name, duration: nil, args: {})
client.puts(JSON.generate('name' => name, 'duration' => duration, **args))
- logger.debug('PRODUCER') { "Sending frame: #{name}" }
+ logger.debug(LOGGER_ID) { "Sending frame: #{name}" }
end
def enqueue(name, duration: nil, args: {})
queue.push([name, duration, args])
end