Sha256: 3408200c2e8c743460074867861145d3d5307fe0e40fcc86dfbf482bfbe5d075

Contents?: true

Size: 983 Bytes

Versions: 10

Compression:

Stored size: 983 Bytes

Contents

class Freddy
  module Consumers
    class TapIntoConsumer
      def initialize(consume_thread_pool, logger)
        @logger = logger
        @consume_thread_pool = consume_thread_pool
      end

      def consume(pattern, channel, &block)
        queue = create_queue(pattern, channel)

        consumer = queue.subscribe do |delivery|
          process_message(queue, delivery, &block)
        end

        ResponderHandler.new(consumer, @consume_thread_pool)
      end

      private

      def create_queue(pattern, channel)
        topic_exchange = channel.topic(Freddy::FREDDY_TOPIC_EXCHANGE_NAME)

        channel
          .queue('', exclusive: true)
          .bind(topic_exchange, routing_key: pattern)
      end

      def process_message(queue, delivery, &block)
        @consume_thread_pool.process do
          Consumers.log_receive_event(@logger, queue.name, delivery)
          block.call delivery.payload, delivery.routing_key
        end
      end
    end
  end
end

Version data entries

10 entries across 10 versions & 2 rubygems

Version Path
freddy-0.6.3 lib/freddy/consumers/tap_into_consumer.rb
freddy-0.6.2 lib/freddy/consumers/tap_into_consumer.rb
freddy-0.6.1 lib/freddy/consumers/tap_into_consumer.rb
freddy-jruby-0.6.0 lib/freddy/consumers/tap_into_consumer.rb
freddy-0.6.0 lib/freddy/consumers/tap_into_consumer.rb
freddy-0.5.8 lib/freddy/consumers/tap_into_consumer.rb
freddy-0.5.7 lib/freddy/consumers/tap_into_consumer.rb
freddy-jruby-0.5.6 lib/freddy/consumers/tap_into_consumer.rb
freddy-0.5.6 lib/freddy/consumers/tap_into_consumer.rb
freddy-0.5.5 lib/freddy/consumers/tap_into_consumer.rb