Sha256: c2ee2a677b0d6b817a9f553c75cea54936d077206ae7fd65889689f1d870fc5f

Contents?: true

Size: 1.2 KB

Versions: 3

Compression:

Stored size: 1.2 KB

Contents

# frozen_string_literal: true

class Freddy
  module Consumers
    class RespondToConsumer
      def self.consume(**attrs, &block)
        new(**attrs).consume(&block)
      end

      def initialize(thread_pool:, destination:, channel:, handler_adapter_factory:)
        @consume_thread_pool = thread_pool
        @destination = destination
        @channel = channel
        @handler_adapter_factory = handler_adapter_factory
      end

      def consume
        consumer = consume_from_destination do |delivery|
          adapter = @handler_adapter_factory.for(delivery)

          msg_handler = MessageHandler.new(adapter, delivery)
          yield(delivery.payload, msg_handler)
        end

        ResponderHandler.new(consumer, @consume_thread_pool)
      end

      private

      def consume_from_destination(&block)
        @channel.queue(@destination).subscribe(manual_ack: true) do |delivery|
          process_message(delivery, &block)
        end
      end

      def process_message(delivery)
        @consume_thread_pool.post do
          delivery.in_span do
            yield(delivery)
          end
        ensure
          @channel.acknowledge(delivery.tag, false)
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
freddy-2.8.0 lib/freddy/consumers/respond_to_consumer.rb
freddy-2.7.0 lib/freddy/consumers/respond_to_consumer.rb
freddy-2.6.0 lib/freddy/consumers/respond_to_consumer.rb