Sha256: cfc3dc34b0884f4528b0b51284bc17210f246ef142dcbeb8bc8ef0dc83f13855

Contents?: true

Size: 1.91 KB

Versions: 7

Compression:

Stored size: 1.91 KB

Contents

# frozen_string_literal: true

class Freddy
  module Consumers
    class TapIntoConsumer
      def self.consume(*attrs, &block)
        new(*attrs).consume(&block)
      end

      def initialize(thread_pool:, patterns:, channel:, options:)
        @consume_thread_pool = thread_pool
        @patterns = patterns
        @channel = channel
        @options = options

        raise 'Do not use durable queues without specifying a group' if durable? && !group
      end

      def consume(&block)
        queue = create_queue

        consumer = queue.subscribe(manual_ack: true) do |delivery|
          process_message(queue, delivery, &block)
        end

        ResponderHandler.new(consumer, @consume_thread_pool)
      end

      private

      def create_queue
        topic_exchange = @channel.topic(Freddy::FREDDY_TOPIC_EXCHANGE_NAME)

        queue =
          if group
            @channel.queue("groups.#{group}", durable: durable?)
          else
            @channel.queue('', exclusive: true)
          end

        @patterns.each do |pattern|
          queue.bind(topic_exchange, routing_key: pattern)
        end

        queue
      end

      def process_message(_queue, delivery)
        @consume_thread_pool.process do
          delivery.in_span(force_follows_from: true) do
            yield delivery.payload, delivery.routing_key
            @channel.acknowledge(delivery.tag)
          end
        rescue StandardError
          case on_exception
          when :reject
            @channel.reject(delivery.tag)
          when :requeue
            @channel.reject(delivery.tag, true)
          else
            @channel.acknowledge(delivery.tag)
          end

          raise
        end
      end

      def group
        @options.fetch(:group, nil)
      end

      def durable?
        @options.fetch(:durable, false)
      end

      def on_exception
        @options.fetch(:on_exception, :ack)
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
freddy-2.2.4 lib/freddy/consumers/tap_into_consumer.rb
freddy-2.2.3 lib/freddy/consumers/tap_into_consumer.rb
freddy-2.2.2 lib/freddy/consumers/tap_into_consumer.rb
freddy-2.2.1 lib/freddy/consumers/tap_into_consumer.rb
freddy-2.2.0 lib/freddy/consumers/tap_into_consumer.rb
freddy-2.1.0 lib/freddy/consumers/tap_into_consumer.rb
freddy-2.0.0 lib/freddy/consumers/tap_into_consumer.rb