Sha256: c5caaa9ed816fbde26dd4e189b571a0840a23cef1d375d0b9dd1c075e3ace8ed

Contents?: true

Size: 1.61 KB

Versions: 1

Compression:

Stored size: 1.61 KB

Contents

module Basquiat
  module Adapters
    class RabbitMq
      class DeadLettering < BaseStrategy
        class << self
          attr_reader :options

          def setup(opts)
            @options = {
                session: { queue: {
                    options: { 'x-dead-letter-exchange' => opts.fetch(:exchange, 'basquiat.dlx') }
                } },
                dlx:     { ttl: opts.fetch(:ttl, 1_000) } }
          end

          def session_options
            options.fetch :session
          rescue KeyError
            raise 'You have to setup the strategy first'
          end
        end

        def initialize(session)
          super
          setup_dead_lettering
        end

        def run(message)
          catch :skip_processing do
            check_incoming_messages(message)
            yield
          end
          public_send(message.action, message.delivery_tag)
        end

        private

        def check_incoming_messages(message)
          message.props.headers and
              message.props.headers['x-death'][1]['queue'] != @session.queue.name and
              throw(:skip_processing)
        end

        def options
          self.class.options
        end

        def setup_dead_lettering
          dlx   = @session.channel.topic('basquiat.dlx')
          queue = @session.channel.queue('basquiat.dlq',
                                         arguments: { 'x-dead-letter-exchange' => @session.exchange.name,
                                                      'x-message-ttl'          => options[:dlx][:ttl] })
          queue.bind(dlx, routing_key: '*.#')
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
basquiat-1.2.0 lib/basquiat/adapters/rabbitmq/requeue_strategies/dead_lettering.rb