lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb in basquiat-1.2.0 vs lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb in basquiat-1.3.0.pre.1

- old
+ new

@@ -1,25 +1,89 @@ module Basquiat module Adapters class RabbitMq - module Strategies - class DelayedDeliveryWIP - def initialize(channel, message) - @channel = channel - @message = message + class DelayedDelivery < BaseStrategy + class << self + using HashRefinements + attr_reader :options + + def setup(opts) + @options = { ddl: { retries: 5, + exchange_name: 'basquiat.dlx', + queue_name_preffix: 'basquiat.ddlq' } }.deep_merge(opts) end + end - # Criar um exchange - # Criar o queue (ou redeclara-lo) - # O queue tem que ter um dlx para o exchange padrĂ£o - # Publicar a mensagem no exchange com um ttl igual ao anterior **2 - # dar um unack caso o tempo estoure o maximo. - def message_handler - delay = message[:headers][0][:expiration]**2 - exchange = channel.topic('basquiat.dd') - queue = channel.queue('delay', ttl: delay * 2) - queue.bind(exchange, 'original_queue.delay.message_name') - exchange.publish('original_queue.delay.message_name', message, ttl: delay, dlx: default_exchange) + def initialize(session) + super + setup_delayed_delivery + end + + def run(message) + message.routing_key = extract_event_info(message.routing_key)[0] + yield + public_send(message.action, message) + end + + # @param [Message] message the, well, message to be requeued + def requeue(message) + @exchange.publish(Basquiat::Json.encode(message), routing_key: requeue_route_for(message.di.routing_key)) + ack(message) + end + + private + + # @param [#match] key the current routing key of the message + # @return [String] the calculated routing key for a republish / requeue + def requeue_route_for(key) + event_name, timeout = extract_event_info(key) + if timeout == 2**options[:retries] * 1_000 + "rejected.#{session.queue.name}.#{event_name}" + else + "#{timeout * 2}.#{session.queue.name}.#{event_name}" + end + end + + # @param [#match] key the current routing key of the message + # @return [Array<String, Integer>] a 2 item array composed of the event.name (aka original routing_key) and + # the current timeout + def extract_event_info(key) + md = key.match(/^(\d+)\.#{session.queue.name}\.(.+)$/) + if md + [md.captures[1], md.captures[0].to_i] + else + # So timeout can turn into 1 second, weird but spares some checking + [key, 500] + end + end + + def options + self.class.options[:ddl] + end + + def setup_delayed_delivery + @exchange = session.channel.topic(options[:exchange_name], durable: true) + session.bind_queue("*.#{session.queue.name}.#") + prepare_timeout_queues + queue = session.channel.queue("#{options[:queue_name_preffix]}_rejected", durable: true) + queue.bind(@exchange, routing_key: 'rejected.#') + end + + def prepare_timeout_queues + queues = (0..options[:retries] - 1).map do |iteration| + timeout = 2**iteration + session.channel.queue("#{options[:queue_name_preffix]}_#{timeout}", durable: true, + arguments: { + 'x-dead-letter-exchange' => session.exchange.name, + 'x-message-ttl' => timeout * 1_000 }) + end + bind_timeout_queues(queues) + end + + def bind_timeout_queues(queues) + queues.each do |queue| + timeout = queue.arguments['x-message-ttl'].to_i + queue.bind(@exchange, routing_key: "#{timeout}.#") end end end end end