# frozen_string_literal: true module AdvancedSneakersActiveJob # This publisher relies on TTL to keep messages in a queue. # When TTL is reached, messages go to another exchange (see dlx_exchange_name param). class DelayedPublisher < ::BunnyPublisher::Base include ::BunnyPublisher::Mandatory delegate :logger, to: :'::ActiveJob::Base' delegate :name_prefix, :delayed_queue_prefix, to: :'AdvancedSneakersActiveJob.config', prefix: :config before_publish :log_message attr_reader :dlx_exchange_name def initialize(exchange:, **options) super(options.merge(exchange: [exchange, 'delayed'].join('-'), exchange_options: { type: 'headers', durable: true })) @dlx_exchange_name = exchange end private def log_message(publisher, message, options = {}) logger.debug do delay = options.dig(:headers, 'delay') "Publishing <#{message}> to [#{publisher.exchange.name}] with routing_key [#{options[:routing_key]}] and delay [#{delay}]" end end def declare_republish_queue(_return_info, properties, _message) delay = properties.headers.fetch('delay') queue_name = delayed_queue_name(delay: delay) queue_arguments = { 'x-queue-mode' => 'lazy', # tell RabbitMQ not to use RAM for this queue as it won't be consumed 'x-message-ttl' => delay * 1000, # make messages die after requested time 'x-dead-letter-exchange' => dlx_exchange_name # dead messages go to original exchange and then routed to proper queues } logger.debug { "Creating delayed queue [#{queue_name}]" } republish_channel.queue(queue_name, durable: true, arguments: queue_arguments) end def declare_republish_queue_binding(queue, _return_info, properties, _message) queue.bind(republish_exchange, arguments: { delay: properties.headers.fetch('delay') }) end def delayed_queue_name(delay:) [ ::ActiveJob::Base.queue_name_prefix, [config_delayed_queue_prefix, delay].join(':') ].compact.join(::ActiveJob::Base.queue_name_delimiter) end end end