lib/freddy/consumers/respond_to_consumer.rb in freddy-jruby-0.6.0 vs lib/freddy/consumers/respond_to_consumer.rb in freddy-jruby-0.7.2

- old
+ new

@@ -1,36 +1,47 @@ class Freddy module Consumers class RespondToConsumer - def initialize(consume_thread_pool, logger) - @consume_thread_pool = consume_thread_pool + def self.consume(*attrs, &block) + new(*attrs).consume(&block) + end + + def initialize(logger:, thread_pool:, destination:, channel:, handler_adapter_factory:) @logger = logger + @consume_thread_pool = thread_pool + @destination = destination + @channel = channel + @handler_adapter_factory = handler_adapter_factory end - def consume(destination, channel, handler_adapter_factory, &block) - consumer = consume_from_destination(destination, channel) do |delivery| - Consumers.log_receive_event(@logger, destination, delivery) + def consume(&block) + consumer = consume_from_destination do |delivery| + Consumers.log_receive_event(@logger, @destination, delivery) - adapter = handler_adapter_factory.for(delivery) + adapter = @handler_adapter_factory.for(delivery) msg_handler = MessageHandler.new(adapter, delivery) block.call(delivery.payload, msg_handler) end ResponderHandler.new(consumer, @consume_thread_pool) end private - def consume_from_destination(destination, channel, &block) - channel.queue(destination).subscribe do |delivery| + def consume_from_destination(&block) + @channel.queue(@destination).subscribe(manual_ack: true) do |delivery| process_message(delivery, &block) end end def process_message(delivery, &block) @consume_thread_pool.process do - block.call(delivery) + begin + block.call(delivery) + ensure + @channel.acknowledge(delivery.tag, false) + end end end end end end