lib/basquiat/adapters/rabbitmq/requeue_strategies/dead_lettering.rb in basquiat-1.2.0 vs lib/basquiat/adapters/rabbitmq/requeue_strategies/dead_lettering.rb in basquiat-1.3.0.pre.1

- old
+ new

@@ -5,14 +5,15 @@ class << self attr_reader :options def setup(opts) @options = { - session: { queue: { - options: { 'x-dead-letter-exchange' => opts.fetch(:exchange, 'basquiat.dlx') } - } }, - dlx: { ttl: opts.fetch(:ttl, 1_000) } } + session: { + queue: { + options: { + 'x-dead-letter-exchange' => opts.fetch(:exchange, 'basquiat.dlx') } } }, + dlx: { ttl: opts.fetch(:ttl, 1_000) } } end def session_options options.fetch :session rescue KeyError @@ -25,33 +26,33 @@ setup_dead_lettering end def run(message) catch :skip_processing do - check_incoming_messages(message) + check_incoming_messages(message.props.headers) yield end - public_send(message.action, message.delivery_tag) + public_send(message.action, message) end private - def check_incoming_messages(message) - message.props.headers and - message.props.headers['x-death'][1]['queue'] != @session.queue.name and - throw(:skip_processing) + def check_incoming_messages(headers) + headers && + headers['x-death'][1]['queue'] != session.queue.name && + throw(:skip_processing) end def options self.class.options end def setup_dead_lettering - dlx = @session.channel.topic('basquiat.dlx') - queue = @session.channel.queue('basquiat.dlq', - arguments: { 'x-dead-letter-exchange' => @session.exchange.name, - 'x-message-ttl' => options[:dlx][:ttl] }) - queue.bind(dlx, routing_key: '*.#') + dlx = session.channel.topic('basquiat.dlx') + queue = session.channel.queue('basquiat.dlq', + arguments: { 'x-dead-letter-exchange' => session.exchange.name, + 'x-message-ttl' => options[:dlx][:ttl] }) + queue.bind(dlx, routing_key: '#') end end end end end