lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb in basquiat-1.2.0 vs lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb in basquiat-1.3.0.pre.1
- old
+ new
@@ -1,25 +1,89 @@
module Basquiat
module Adapters
class RabbitMq
- module Strategies
- class DelayedDeliveryWIP
- def initialize(channel, message)
- @channel = channel
- @message = message
+ class DelayedDelivery < BaseStrategy
+ class << self
+ using HashRefinements
+ attr_reader :options
+
+ def setup(opts)
+ @options = { ddl: { retries: 5,
+ exchange_name: 'basquiat.dlx',
+ queue_name_preffix: 'basquiat.ddlq' } }.deep_merge(opts)
end
+ end
- # Criar um exchange
- # Criar o queue (ou redeclara-lo)
- # O queue tem que ter um dlx para o exchange padrĂ£o
- # Publicar a mensagem no exchange com um ttl igual ao anterior **2
- # dar um unack caso o tempo estoure o maximo.
- def message_handler
- delay = message[:headers][0][:expiration]**2
- exchange = channel.topic('basquiat.dd')
- queue = channel.queue('delay', ttl: delay * 2)
- queue.bind(exchange, 'original_queue.delay.message_name')
- exchange.publish('original_queue.delay.message_name', message, ttl: delay, dlx: default_exchange)
+ def initialize(session)
+ super
+ setup_delayed_delivery
+ end
+
+ def run(message)
+ message.routing_key = extract_event_info(message.routing_key)[0]
+ yield
+ public_send(message.action, message)
+ end
+
+ # @param [Message] message the, well, message to be requeued
+ def requeue(message)
+ @exchange.publish(Basquiat::Json.encode(message), routing_key: requeue_route_for(message.di.routing_key))
+ ack(message)
+ end
+
+ private
+
+ # @param [#match] key the current routing key of the message
+ # @return [String] the calculated routing key for a republish / requeue
+ def requeue_route_for(key)
+ event_name, timeout = extract_event_info(key)
+ if timeout == 2**options[:retries] * 1_000
+ "rejected.#{session.queue.name}.#{event_name}"
+ else
+ "#{timeout * 2}.#{session.queue.name}.#{event_name}"
+ end
+ end
+
+ # @param [#match] key the current routing key of the message
+ # @return [Array<String, Integer>] a 2 item array composed of the event.name (aka original routing_key) and
+ # the current timeout
+ def extract_event_info(key)
+ md = key.match(/^(\d+)\.#{session.queue.name}\.(.+)$/)
+ if md
+ [md.captures[1], md.captures[0].to_i]
+ else
+ # So timeout can turn into 1 second, weird but spares some checking
+ [key, 500]
+ end
+ end
+
+ def options
+ self.class.options[:ddl]
+ end
+
+ def setup_delayed_delivery
+ @exchange = session.channel.topic(options[:exchange_name], durable: true)
+ session.bind_queue("*.#{session.queue.name}.#")
+ prepare_timeout_queues
+ queue = session.channel.queue("#{options[:queue_name_preffix]}_rejected", durable: true)
+ queue.bind(@exchange, routing_key: 'rejected.#')
+ end
+
+ def prepare_timeout_queues
+ queues = (0..options[:retries] - 1).map do |iteration|
+ timeout = 2**iteration
+ session.channel.queue("#{options[:queue_name_preffix]}_#{timeout}", durable: true,
+ arguments: {
+ 'x-dead-letter-exchange' => session.exchange.name,
+ 'x-message-ttl' => timeout * 1_000 })
+ end
+ bind_timeout_queues(queues)
+ end
+
+ def bind_timeout_queues(queues)
+ queues.each do |queue|
+ timeout = queue.arguments['x-message-ttl'].to_i
+ queue.bind(@exchange, routing_key: "#{timeout}.#")
end
end
end
end
end