Sha256: 94b3da88a3d5754ebb490f298a2645bd5b671ed2c5148328eb95e9e9177a81c4
Contents?: true
Size: 867 Bytes
Versions: 8
Compression:
Stored size: 867 Bytes
Contents
class Freddy module Consumers class TapIntoConsumer def initialize(consume_thread_pool) @consume_thread_pool = consume_thread_pool end def consume(pattern, channel, &block) queue = create_queue(pattern, channel) consumer = queue.subscribe do |delivery| process_message(delivery, &block) end ResponderHandler.new(consumer, @consume_thread_pool) end private def create_queue(pattern, channel) topic_exchange = channel.topic(Freddy::FREDDY_TOPIC_EXCHANGE_NAME) channel .queue('', exclusive: true) .bind(topic_exchange, routing_key: pattern) end def process_message(delivery, &block) @consume_thread_pool.process do block.call delivery.payload, delivery.routing_key end end end end end
Version data entries
8 entries across 8 versions & 2 rubygems