lib/basquiat/adapters/rabbitmq/requeue_strategies/dead_lettering.rb in basquiat-1.2.0 vs lib/basquiat/adapters/rabbitmq/requeue_strategies/dead_lettering.rb in basquiat-1.3.0.pre.1
- old
+ new
@@ -5,14 +5,15 @@
class << self
attr_reader :options
def setup(opts)
@options = {
- session: { queue: {
- options: { 'x-dead-letter-exchange' => opts.fetch(:exchange, 'basquiat.dlx') }
- } },
- dlx: { ttl: opts.fetch(:ttl, 1_000) } }
+ session: {
+ queue: {
+ options: {
+ 'x-dead-letter-exchange' => opts.fetch(:exchange, 'basquiat.dlx') } } },
+ dlx: { ttl: opts.fetch(:ttl, 1_000) } }
end
def session_options
options.fetch :session
rescue KeyError
@@ -25,33 +26,33 @@
setup_dead_lettering
end
def run(message)
catch :skip_processing do
- check_incoming_messages(message)
+ check_incoming_messages(message.props.headers)
yield
end
- public_send(message.action, message.delivery_tag)
+ public_send(message.action, message)
end
private
- def check_incoming_messages(message)
- message.props.headers and
- message.props.headers['x-death'][1]['queue'] != @session.queue.name and
- throw(:skip_processing)
+ def check_incoming_messages(headers)
+ headers &&
+ headers['x-death'][1]['queue'] != session.queue.name &&
+ throw(:skip_processing)
end
def options
self.class.options
end
def setup_dead_lettering
- dlx = @session.channel.topic('basquiat.dlx')
- queue = @session.channel.queue('basquiat.dlq',
- arguments: { 'x-dead-letter-exchange' => @session.exchange.name,
- 'x-message-ttl' => options[:dlx][:ttl] })
- queue.bind(dlx, routing_key: '*.#')
+ dlx = session.channel.topic('basquiat.dlx')
+ queue = session.channel.queue('basquiat.dlq',
+ arguments: { 'x-dead-letter-exchange' => session.exchange.name,
+ 'x-message-ttl' => options[:dlx][:ttl] })
+ queue.bind(dlx, routing_key: '#')
end
end
end
end
end