Sha256: 94b3da88a3d5754ebb490f298a2645bd5b671ed2c5148328eb95e9e9177a81c4

Contents?: true

Size: 867 Bytes

Versions: 8

Compression:

Stored size: 867 Bytes

Contents

class Freddy
  module Consumers
    class TapIntoConsumer
      def initialize(consume_thread_pool)
        @consume_thread_pool = consume_thread_pool
      end

      def consume(pattern, channel, &block)
        queue = create_queue(pattern, channel)

        consumer = queue.subscribe do |delivery|
          process_message(delivery, &block)
        end

        ResponderHandler.new(consumer, @consume_thread_pool)
      end

      private

      def create_queue(pattern, channel)
        topic_exchange = channel.topic(Freddy::FREDDY_TOPIC_EXCHANGE_NAME)

        channel
          .queue('', exclusive: true)
          .bind(topic_exchange, routing_key: pattern)
      end

      def process_message(delivery, &block)
        @consume_thread_pool.process do
          block.call delivery.payload, delivery.routing_key
        end
      end
    end
  end
end

Version data entries

8 entries across 8 versions & 2 rubygems

Version Path
freddy-0.5.3 lib/freddy/consumers/tap_into_consumer.rb
freddy-0.5.2 lib/freddy/consumers/tap_into_consumer.rb
freddy-jruby-0.5.1 lib/freddy/consumers/tap_into_consumer.rb
freddy-0.5.1 lib/freddy/consumers/tap_into_consumer.rb
freddy-jruby-0.5.0 lib/freddy/consumers/tap_into_consumer.rb
freddy-0.5.0 lib/freddy/consumers/tap_into_consumer.rb
freddy-jruby-0.4.9 lib/freddy/consumers/tap_into_consumer.rb
freddy-0.4.9 lib/freddy/consumers/tap_into_consumer.rb