lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb in basquiat-1.3.0.pre.1 vs lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb in basquiat-1.3.0

- old
+ new

@@ -1,23 +1,25 @@ +# frozen_string_literal: true module Basquiat module Adapters class RabbitMq class DelayedDelivery < BaseStrategy class << self using HashRefinements attr_reader :options def setup(opts) - @options = { ddl: { retries: 5, - exchange_name: 'basquiat.dlx', - queue_name_preffix: 'basquiat.ddlq' } }.deep_merge(opts) + @options = { retries: 5, + exchange_name: 'basquiat.dlx', + queue_name_preffix: 'basquiat.ddlq' }.deep_merge(opts) end end def initialize(session) super setup_delayed_delivery + @queue_name = session.queue_name end def run(message) message.routing_key = extract_event_info(message.routing_key)[0] yield @@ -34,57 +36,54 @@ # @param [#match] key the current routing key of the message # @return [String] the calculated routing key for a republish / requeue def requeue_route_for(key) event_name, timeout = extract_event_info(key) - if timeout == 2**options[:retries] * 1_000 - "rejected.#{session.queue.name}.#{event_name}" + if timeout == max_timeout + "rejected.#{@queue_name}.#{event_name}" else - "#{timeout * 2}.#{session.queue.name}.#{event_name}" + "#{timeout * 2}.#{@queue_name}.#{event_name}" end end # @param [#match] key the current routing key of the message # @return [Array<String, Integer>] a 2 item array composed of the event.name (aka original routing_key) and # the current timeout def extract_event_info(key) - md = key.match(/^(\d+)\.#{session.queue.name}\.(.+)$/) - if md - [md.captures[1], md.captures[0].to_i] - else - # So timeout can turn into 1 second, weird but spares some checking - [key, 500] - end + matched = key.match(/^(\d+)\.#{session.queue.name}\.(.+)$/) + matched && [matched.captures[1], matched.captures[0].to_i] || [key, 500] end def options - self.class.options[:ddl] + self.class.options end def setup_delayed_delivery @exchange = session.channel.topic(options[:exchange_name], durable: true) session.bind_queue("*.#{session.queue.name}.#") prepare_timeout_queues + create_and_bind_rejected_queue + end + + def create_and_bind_rejected_queue queue = session.channel.queue("#{options[:queue_name_preffix]}_rejected", durable: true) queue.bind(@exchange, routing_key: 'rejected.#') end def prepare_timeout_queues - queues = (0..options[:retries] - 1).map do |iteration| + (0..options[:retries] - 1).each do |iteration| timeout = 2**iteration - session.channel.queue("#{options[:queue_name_preffix]}_#{timeout}", durable: true, - arguments: { - 'x-dead-letter-exchange' => session.exchange.name, - 'x-message-ttl' => timeout * 1_000 }) + queue = session.channel.queue("#{options[:queue_name_preffix]}_#{timeout}", + durable: true, + arguments: { + 'x-dead-letter-exchange' => session.exchange.name, + 'x-message-ttl' => timeout * 1_000 }) + queue.bind(@exchange, routing_key: "#{timeout * 1_000}.#") end - bind_timeout_queues(queues) end - def bind_timeout_queues(queues) - queues.each do |queue| - timeout = queue.arguments['x-message-ttl'].to_i - queue.bind(@exchange, routing_key: "#{timeout}.#") - end + def max_timeout + 2**(options[:retries] - 1) * 1_000 end end end end end