Sha256: 17f0c94c9e727167836efb96f0b2ecdbe78d40326849a00097f3cf040031c2ce

Contents?: true

Size: 1.91 KB

Versions: 2

Compression:

Stored size: 1.91 KB

Contents

# frozen_string_literal: true

module AdvancedSneakersActiveJob
  # This publisher relies on TTL to keep messages in a queue.
  # When TTL is reached, messages go to another exchange (see dlx_exchange_name param).
  class DelayedPublisher < ::BunnyPublisher::Base
    include ::BunnyPublisher::Mandatory

    delegate :logger, to: :'::ActiveJob::Base'

    delegate :name_prefix, :delayed_queue_prefix,
             to: :'AdvancedSneakersActiveJob.config',
             prefix: :config

    before_publish :log_message

    attr_reader :dlx_exchange_name

    def initialize(exchange:, **options)
      super(**options.merge(exchange: [exchange, 'delayed'].join('-'), exchange_options: { type: 'headers', durable: true }))

      @dlx_exchange_name = exchange
    end

    private

    def log_message
      logger.debug do
        "Publishing <#{message}> to [#{@exchange_name}] with routing_key [#{message_options[:routing_key]}] and delay [#{delay}]"
      end
    end

    def declare_republish_queue
      queue_name = delayed_queue_name(delay: delay)

      queue_arguments = {
        'x-queue-mode' => 'lazy', # tell RabbitMQ not to use RAM for this queue as it won't be consumed
        'x-message-ttl' => delay * 1000, # make messages die after requested time
        'x-dead-letter-exchange' => dlx_exchange_name # dead messages go to original exchange and then routed to proper queues
      }

      logger.debug { "Creating delayed queue [#{queue_name}]" }

      channel.queue(queue_name, durable: true, arguments: queue_arguments)
    end

    def delay
      message_options.dig(:headers, 'delay')
    end

    def declare_republish_queue_binding(queue)
      queue.bind(exchange, arguments: { delay: delay })
    end

    def delayed_queue_name(delay:)
      [
        ::ActiveJob::Base.queue_name_prefix,
        [config_delayed_queue_prefix, delay].join(':')
      ].compact.join(::ActiveJob::Base.queue_name_delimiter)
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
advanced-sneakers-activejob-0.6.0 lib/advanced_sneakers_activejob/delayed_publisher.rb
advanced-sneakers-activejob-0.5.0 lib/advanced_sneakers_activejob/delayed_publisher.rb