lib/bbk/amqp/consumer.rb in bbk-amqp-1.0.0.79544 vs lib/bbk/amqp/consumer.rb in bbk-amqp-1.0.0.79546
- old
+ new
@@ -1,19 +1,20 @@
# frozen_string_literal: true
+require 'bbk/amqp/rejection_policies/requeue'
module BBK
module AMQP
class Consumer
- attr_reader :connection, :queue_name, :queue, :options, :logger
+ attr_reader :connection, :queue_name, :queue, :rejection_policy, :options, :logger
DEFAULT_OPTIONS = {
consumer_pool_size: 3,
consumer_pool_abort_on_exception: true,
prefetch_size: 10,
- requeue_on_reject: false,
- consumer_tag: nil
+ consumer_tag: nil,
+ rejection_policy: RejectionPolicies::Requeue.new
}.freeze
PROTOCOLS = %w[mq amqp amqps].freeze
def initialize(connection, queue_name: nil, **options)
@@ -26,10 +27,11 @@
end
@queue_name = @queue&.name || queue_name
@options = DEFAULT_OPTIONS.merge(options)
+ @rejection_policy = @options.delete(:rejection_policy)
logger = @options.fetch(:logger, BBK::AMQP.logger)
logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger)
@logger = BBK::Utils::ProxyLogger.new(logger, tags: [self.class.to_s, queue_name])
end
@@ -45,11 +47,11 @@
@channel ||= @connection.create_channel(nil, options[:consumer_pool_size],
options[:consumer_pool_abort_on_exception]).tap do |ch|
ch.prefetch(options[:prefetch_size])
end
- @logger.add_tags "Ch##{@channel.id}"
+ logger.add_tags "Ch##{@channel.id}"
@queue ||= @channel.queue(queue_name, passive: true)
subscribe_opts = {
block: false,
@@ -80,14 +82,11 @@
incoming.delivery_info[:channel].ack incoming.delivery_info[:delivery_tag]
end
# Nack incoming message
# @param incoming [BBK::AMQP::Message] nack procesing message
- def nack(incoming, *args, error: nil, requeue: nil, **_kwargs)
- logger.debug "Reject message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}. Error: #{error.inspect}"
- requeue_message = requeue.nil? ? options[:requeue_on_reject] : requeue
- incoming.delivery_info[:channel].reject incoming.delivery_info[:delivery_tag],
- requeue_message
+ def nack(incoming, *args, error: nil, **_kwargs)
+ rejection_policy.call(incoming, error)
end
# stop consuming messages
def stop
@subscription.tap do |s|