lib/bbk/amqp/consumer.rb in bbk-amqp-1.0.0.79544 vs lib/bbk/amqp/consumer.rb in bbk-amqp-1.0.0.79546

- old
+ new

@@ -1,19 +1,20 @@ # frozen_string_literal: true +require 'bbk/amqp/rejection_policies/requeue' module BBK module AMQP class Consumer - attr_reader :connection, :queue_name, :queue, :options, :logger + attr_reader :connection, :queue_name, :queue, :rejection_policy, :options, :logger DEFAULT_OPTIONS = { consumer_pool_size: 3, consumer_pool_abort_on_exception: true, prefetch_size: 10, - requeue_on_reject: false, - consumer_tag: nil + consumer_tag: nil, + rejection_policy: RejectionPolicies::Requeue.new }.freeze PROTOCOLS = %w[mq amqp amqps].freeze def initialize(connection, queue_name: nil, **options) @@ -26,10 +27,11 @@ end @queue_name = @queue&.name || queue_name @options = DEFAULT_OPTIONS.merge(options) + @rejection_policy = @options.delete(:rejection_policy) logger = @options.fetch(:logger, BBK::AMQP.logger) logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger) @logger = BBK::Utils::ProxyLogger.new(logger, tags: [self.class.to_s, queue_name]) end @@ -45,11 +47,11 @@ @channel ||= @connection.create_channel(nil, options[:consumer_pool_size], options[:consumer_pool_abort_on_exception]).tap do |ch| ch.prefetch(options[:prefetch_size]) end - @logger.add_tags "Ch##{@channel.id}" + logger.add_tags "Ch##{@channel.id}" @queue ||= @channel.queue(queue_name, passive: true) subscribe_opts = { block: false, @@ -80,14 +82,11 @@ incoming.delivery_info[:channel].ack incoming.delivery_info[:delivery_tag] end # Nack incoming message # @param incoming [BBK::AMQP::Message] nack procesing message - def nack(incoming, *args, error: nil, requeue: nil, **_kwargs) - logger.debug "Reject message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}. Error: #{error.inspect}" - requeue_message = requeue.nil? ? options[:requeue_on_reject] : requeue - incoming.delivery_info[:channel].reject incoming.delivery_info[:delivery_tag], - requeue_message + def nack(incoming, *args, error: nil, **_kwargs) + rejection_policy.call(incoming, error) end # stop consuming messages def stop @subscription.tap do |s|