Sha256: aba1ca8497ff21056fd33658422989eafa2515b2bef664092223c44ff30ae8c6
Contents?: true
Size: 1.29 KB
Versions: 6
Compression:
Stored size: 1.29 KB
Contents
class Freddy module Consumers class RespondToConsumer def self.consume(*attrs, &block) new(*attrs).consume(&block) end def initialize(logger:, thread_pool:, destination:, channel:, handler_adapter_factory:) @logger = logger @consume_thread_pool = thread_pool @destination = destination @channel = channel @handler_adapter_factory = handler_adapter_factory end def consume(&block) consumer = consume_from_destination do |delivery| Consumers.log_receive_event(@logger, @destination, delivery) adapter = @handler_adapter_factory.for(delivery) msg_handler = MessageHandler.new(adapter, delivery) block.call(delivery.payload, msg_handler) end ResponderHandler.new(consumer, @consume_thread_pool) end private def consume_from_destination(&block) @channel.queue(@destination).subscribe(manual_ack: true) do |delivery| process_message(delivery, &block) end end def process_message(delivery, &block) @consume_thread_pool.process do begin block.call(delivery) ensure @channel.acknowledge(delivery.tag, false) end end end end end end
Version data entries
6 entries across 6 versions & 2 rubygems