Sha256: cfc3dc34b0884f4528b0b51284bc17210f246ef142dcbeb8bc8ef0dc83f13855
Contents?: true
Size: 1.91 KB
Versions: 7
Compression:
Stored size: 1.91 KB
Contents
# frozen_string_literal: true class Freddy module Consumers class TapIntoConsumer def self.consume(*attrs, &block) new(*attrs).consume(&block) end def initialize(thread_pool:, patterns:, channel:, options:) @consume_thread_pool = thread_pool @patterns = patterns @channel = channel @options = options raise 'Do not use durable queues without specifying a group' if durable? && !group end def consume(&block) queue = create_queue consumer = queue.subscribe(manual_ack: true) do |delivery| process_message(queue, delivery, &block) end ResponderHandler.new(consumer, @consume_thread_pool) end private def create_queue topic_exchange = @channel.topic(Freddy::FREDDY_TOPIC_EXCHANGE_NAME) queue = if group @channel.queue("groups.#{group}", durable: durable?) else @channel.queue('', exclusive: true) end @patterns.each do |pattern| queue.bind(topic_exchange, routing_key: pattern) end queue end def process_message(_queue, delivery) @consume_thread_pool.process do delivery.in_span(force_follows_from: true) do yield delivery.payload, delivery.routing_key @channel.acknowledge(delivery.tag) end rescue StandardError case on_exception when :reject @channel.reject(delivery.tag) when :requeue @channel.reject(delivery.tag, true) else @channel.acknowledge(delivery.tag) end raise end end def group @options.fetch(:group, nil) end def durable? @options.fetch(:durable, false) end def on_exception @options.fetch(:on_exception, :ack) end end end end
Version data entries
7 entries across 7 versions & 1 rubygems