Sha256: 9d7a387f5589238d73ea821fda998482a3161a8948e68804de120598fa5b8be7
Contents?: true
Size: 1.4 KB
Versions: 2
Compression:
Stored size: 1.4 KB
Contents
module BBK module AMQP module RejectionPolicies class Republish REPUBLISH_COUNTER_KEY = 'x-republish-count'.freeze attr_reader :logger def initialize(publisher, logger: BBK::Utils::Logger.default) @publisher = publisher @logger = ActiveSupport::TaggedLogging.new(logger).tagged(self.class.name) end def call(message, error, *_args, **_kwargs) if message.delivery_info[:redelivered] || message.headers.key?(REPUBLISH_COUNTER_KEY) republish_message(message, error) else requeue_message(message, error) end end def requeue_message(message, error) logger.warn "Requeue message #{message.headers[:type]}[#{message.headers[:message_id]}] delivery tag: #{message.delivery_info[:delivery_tag].to_i}. Error: #{error.inspect}" message.delivery_info[:channel].reject message.delivery_info[:delivery_tag], true end def republish_message(message, error) logger.warn "Republish message #{message.headers[:type]}[#{message.headers[:message_id]}]. Error: #{error.inspect}" msg = message.clone msg.headers[REPUBLISH_COUNTER_KEY] = msg.headers.fetch(REPUBLISH_COUNTER_KEY, 0).to_i + 1 @publisher.publish_message(msg.delivery_info[:queue], msg, exchange: '').value! message.ack end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
bbk-amqp-1.1.1.273631 | lib/bbk/amqp/rejection_policies/republish.rb |
bbk-amqp-1.1.1.273342 | lib/bbk/amqp/rejection_policies/republish.rb |