lib/freddy/consumers/respond_to_consumer.rb in freddy-jruby-0.6.0 vs lib/freddy/consumers/respond_to_consumer.rb in freddy-jruby-0.7.2
- old
+ new
@@ -1,36 +1,47 @@
class Freddy
module Consumers
class RespondToConsumer
- def initialize(consume_thread_pool, logger)
- @consume_thread_pool = consume_thread_pool
+ def self.consume(*attrs, &block)
+ new(*attrs).consume(&block)
+ end
+
+ def initialize(logger:, thread_pool:, destination:, channel:, handler_adapter_factory:)
@logger = logger
+ @consume_thread_pool = thread_pool
+ @destination = destination
+ @channel = channel
+ @handler_adapter_factory = handler_adapter_factory
end
- def consume(destination, channel, handler_adapter_factory, &block)
- consumer = consume_from_destination(destination, channel) do |delivery|
- Consumers.log_receive_event(@logger, destination, delivery)
+ def consume(&block)
+ consumer = consume_from_destination do |delivery|
+ Consumers.log_receive_event(@logger, @destination, delivery)
- adapter = handler_adapter_factory.for(delivery)
+ adapter = @handler_adapter_factory.for(delivery)
msg_handler = MessageHandler.new(adapter, delivery)
block.call(delivery.payload, msg_handler)
end
ResponderHandler.new(consumer, @consume_thread_pool)
end
private
- def consume_from_destination(destination, channel, &block)
- channel.queue(destination).subscribe do |delivery|
+ def consume_from_destination(&block)
+ @channel.queue(@destination).subscribe(manual_ack: true) do |delivery|
process_message(delivery, &block)
end
end
def process_message(delivery, &block)
@consume_thread_pool.process do
- block.call(delivery)
+ begin
+ block.call(delivery)
+ ensure
+ @channel.acknowledge(delivery.tag, false)
+ end
end
end
end
end
end