Sha256: d9b600c472cb50fbfd2c674790840b72c5a0d0e8c03368b786d8b379f1e8cb8a
Contents?: true
Size: 1.76 KB
Versions: 1
Compression:
Stored size: 1.76 KB
Contents
class Freddy module Consumers class RespondToConsumer def self.consume(*attrs, &block) new(*attrs).consume(&block) end def initialize(thread_pool:, destination:, channel:, handler_adapter_factory:) @consume_thread_pool = thread_pool @destination = destination @channel = channel @handler_adapter_factory = handler_adapter_factory end def consume(&block) consumer = consume_from_destination do |delivery| adapter = @handler_adapter_factory.for(delivery) msg_handler = MessageHandler.new(adapter, delivery) block.call(delivery.payload, msg_handler) end ResponderHandler.new(consumer, @consume_thread_pool) end private def consume_from_destination(&block) @channel.queue(@destination).subscribe(manual_ack: true) do |delivery| process_message(delivery, &block) end end def process_message(delivery, &block) @consume_thread_pool.process do begin Freddy.trace = delivery.build_trace("freddy:respond:#{@destination}", tags: { 'peer.address': "#{@destination}:#{delivery.payload[:type]}", 'component': 'freddy', 'span.kind': 'server' # RPC } ) Freddy.trace.log_kv( event: 'Received message through respond_to', queue: @destination, payload: delivery.payload, correlation_id: delivery.correlation_id ) block.call(delivery) ensure @channel.acknowledge(delivery.tag, false) Freddy.trace.finish Freddy.trace = nil end end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
freddy-1.3.3 | lib/freddy/consumers/respond_to_consumer.rb |