Sha256: 414af568babaee41a14d24fd0ca2e23702619164c4e901272ed1978f2138050f
Contents?: true
Size: 1.9 KB
Versions: 2
Compression:
Stored size: 1.9 KB
Contents
# frozen_string_literal: true class Freddy module Consumers class TapIntoConsumer def self.consume(*attrs, &block) new(*attrs).consume(&block) end def initialize(thread_pool:, pattern:, channel:, options:) @consume_thread_pool = thread_pool @pattern = pattern @channel = channel @options = options end def consume(&block) queue = create_queue consumer = queue.subscribe(manual_ack: true) do |delivery| process_message(queue, delivery, &block) end ResponderHandler.new(consumer, @consume_thread_pool) end private def create_queue topic_exchange = @channel.topic(Freddy::FREDDY_TOPIC_EXCHANGE_NAME) group = @options.fetch(:group, nil) if group @channel .queue("groups.#{group}") .bind(topic_exchange, routing_key: @pattern) else @channel .queue('', exclusive: true) .bind(topic_exchange, routing_key: @pattern) end end def process_message(_queue, delivery) @consume_thread_pool.process do begin scope = delivery.build_trace("freddy:observe:#{@pattern}", tags: { 'message_bus.destination' => @pattern, 'message_bus.correlation_id' => delivery.correlation_id, 'component' => 'freddy', 'span.kind' => 'consumer' # Message Bus }, force_follows_from: true) yield delivery.payload, delivery.routing_key ensure @channel.acknowledge(delivery.tag, false) scope.close end end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
freddy-1.5.0 | lib/freddy/consumers/tap_into_consumer.rb |
freddy-1.4.2 | lib/freddy/consumers/tap_into_consumer.rb |