Sha256: 2f2216f2e1e65b332be7c3e1d8e91c20f040655e7ae779d5e2dcb80087fc6c12

Contents?: true

Size: 1.72 KB

Versions: 2

Compression:

Stored size: 1.72 KB

Contents

class Freddy
  module Consumers
    class RespondToConsumer
      def self.consume(*attrs, &block)
        new(*attrs).consume(&block)
      end

      def initialize(thread_pool:, destination:, channel:, handler_adapter_factory:)
        @consume_thread_pool = thread_pool
        @destination = destination
        @channel = channel
        @handler_adapter_factory = handler_adapter_factory
      end

      def consume(&block)
        consumer = consume_from_destination do |delivery|
          adapter = @handler_adapter_factory.for(delivery)

          msg_handler = MessageHandler.new(adapter, delivery)
          block.call(delivery.payload, msg_handler)
        end

        ResponderHandler.new(consumer, @consume_thread_pool)
      end

      private

      def consume_from_destination(&block)
        @channel.queue(@destination).subscribe(manual_ack: true) do |delivery|
          process_message(delivery, &block)
        end
      end

      def process_message(delivery, &block)
        @consume_thread_pool.process do
          begin
            scope = delivery.build_trace("freddy:respond:#{@destination}",
              tags: {
                'peer.address': "#{@destination}:#{delivery.payload[:type]}",
                'component': 'freddy',
                'span.kind': 'server' # RPC
              }
            )
            scope.span.log_kv(
              event: 'Received message through respond_to',
              queue: @destination,
              payload: delivery.payload,
              correlation_id: delivery.correlation_id
            )

            block.call(delivery)
          ensure
            @channel.acknowledge(delivery.tag, false)
            scope.close
          end
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
freddy-1.4.1 lib/freddy/consumers/respond_to_consumer.rb
freddy-1.4.0 lib/freddy/consumers/respond_to_consumer.rb