lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb in basquiat-1.3.0.pre.1 vs lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb in basquiat-1.3.0
- old
+ new
@@ -1,23 +1,25 @@
+# frozen_string_literal: true
module Basquiat
module Adapters
class RabbitMq
class DelayedDelivery < BaseStrategy
class << self
using HashRefinements
attr_reader :options
def setup(opts)
- @options = { ddl: { retries: 5,
- exchange_name: 'basquiat.dlx',
- queue_name_preffix: 'basquiat.ddlq' } }.deep_merge(opts)
+ @options = { retries: 5,
+ exchange_name: 'basquiat.dlx',
+ queue_name_preffix: 'basquiat.ddlq' }.deep_merge(opts)
end
end
def initialize(session)
super
setup_delayed_delivery
+ @queue_name = session.queue_name
end
def run(message)
message.routing_key = extract_event_info(message.routing_key)[0]
yield
@@ -34,57 +36,54 @@
# @param [#match] key the current routing key of the message
# @return [String] the calculated routing key for a republish / requeue
def requeue_route_for(key)
event_name, timeout = extract_event_info(key)
- if timeout == 2**options[:retries] * 1_000
- "rejected.#{session.queue.name}.#{event_name}"
+ if timeout == max_timeout
+ "rejected.#{@queue_name}.#{event_name}"
else
- "#{timeout * 2}.#{session.queue.name}.#{event_name}"
+ "#{timeout * 2}.#{@queue_name}.#{event_name}"
end
end
# @param [#match] key the current routing key of the message
# @return [Array<String, Integer>] a 2 item array composed of the event.name (aka original routing_key) and
# the current timeout
def extract_event_info(key)
- md = key.match(/^(\d+)\.#{session.queue.name}\.(.+)$/)
- if md
- [md.captures[1], md.captures[0].to_i]
- else
- # So timeout can turn into 1 second, weird but spares some checking
- [key, 500]
- end
+ matched = key.match(/^(\d+)\.#{session.queue.name}\.(.+)$/)
+ matched && [matched.captures[1], matched.captures[0].to_i] || [key, 500]
end
def options
- self.class.options[:ddl]
+ self.class.options
end
def setup_delayed_delivery
@exchange = session.channel.topic(options[:exchange_name], durable: true)
session.bind_queue("*.#{session.queue.name}.#")
prepare_timeout_queues
+ create_and_bind_rejected_queue
+ end
+
+ def create_and_bind_rejected_queue
queue = session.channel.queue("#{options[:queue_name_preffix]}_rejected", durable: true)
queue.bind(@exchange, routing_key: 'rejected.#')
end
def prepare_timeout_queues
- queues = (0..options[:retries] - 1).map do |iteration|
+ (0..options[:retries] - 1).each do |iteration|
timeout = 2**iteration
- session.channel.queue("#{options[:queue_name_preffix]}_#{timeout}", durable: true,
- arguments: {
- 'x-dead-letter-exchange' => session.exchange.name,
- 'x-message-ttl' => timeout * 1_000 })
+ queue = session.channel.queue("#{options[:queue_name_preffix]}_#{timeout}",
+ durable: true,
+ arguments: {
+ 'x-dead-letter-exchange' => session.exchange.name,
+ 'x-message-ttl' => timeout * 1_000 })
+ queue.bind(@exchange, routing_key: "#{timeout * 1_000}.#")
end
- bind_timeout_queues(queues)
end
- def bind_timeout_queues(queues)
- queues.each do |queue|
- timeout = queue.arguments['x-message-ttl'].to_i
- queue.bind(@exchange, routing_key: "#{timeout}.#")
- end
+ def max_timeout
+ 2**(options[:retries] - 1) * 1_000
end
end
end
end
end