Sha256: bc9f71a82ba0837d641eb6f0568617fa33fb410ce9a513be5ede058a6aaca018

Contents?: true

Size: 1.02 KB

Versions: 11

Compression:

Stored size: 1.02 KB

Contents

class Freddy
  module Consumers
    class RespondToConsumer
      def initialize(consume_thread_pool, logger)
        @consume_thread_pool = consume_thread_pool
        @logger = logger
      end

      def consume(destination, channel, handler_adapter_factory, &block)
        consumer = consume_from_destination(destination, channel) do |delivery|
          Consumers.log_receive_event(@logger, destination, delivery)

          adapter = handler_adapter_factory.for(delivery)

          msg_handler = MessageHandler.new(adapter, delivery)
          block.call(delivery.payload, msg_handler)
        end

        ResponderHandler.new(consumer, @consume_thread_pool)
      end

      private

      def consume_from_destination(destination, channel, &block)
        channel.queue(destination).subscribe do |delivery|
          process_message(delivery, &block)
        end
      end

      def process_message(delivery, &block)
        @consume_thread_pool.process do
          block.call(delivery)
        end
      end
    end
  end
end

Version data entries

11 entries across 11 versions & 2 rubygems

Version Path
freddy-0.7.0 lib/freddy/consumers/respond_to_consumer.rb
freddy-0.6.3 lib/freddy/consumers/respond_to_consumer.rb
freddy-0.6.2 lib/freddy/consumers/respond_to_consumer.rb
freddy-0.6.1 lib/freddy/consumers/respond_to_consumer.rb
freddy-jruby-0.6.0 lib/freddy/consumers/respond_to_consumer.rb
freddy-0.6.0 lib/freddy/consumers/respond_to_consumer.rb
freddy-0.5.8 lib/freddy/consumers/respond_to_consumer.rb
freddy-0.5.7 lib/freddy/consumers/respond_to_consumer.rb
freddy-jruby-0.5.6 lib/freddy/consumers/respond_to_consumer.rb
freddy-0.5.6 lib/freddy/consumers/respond_to_consumer.rb
freddy-0.5.5 lib/freddy/consumers/respond_to_consumer.rb