Sha256: 9ba95f12b12d23879f27a34ee111611f580d9d441fe10e70cf107e0de26e9490
Contents?: true
Size: 1.53 KB
Versions: 3
Compression:
Stored size: 1.53 KB
Contents
# frozen_string_literal: true module Basquiat module Adapters class RabbitMq class DeadLettering < BaseStrategy class << self attr_reader :options def setup(opts) @options = { session: { queue: { options: { 'x-dead-letter-exchange' => opts.fetch(:exchange, 'basquiat.dlx') } } }, dlx: { ttl: opts.fetch(:ttl, 1_000) } } end def session_options options.fetch :session end end def initialize(session) super setup_dead_lettering end def run(message) catch :skip_processing do check_incoming_messages(message.props.headers) yield end public_send(message.action, message) end private def check_incoming_messages(headers) headers && headers['x-death'][1]['queue'] != session.queue.name && throw(:skip_processing) end def options self.class.options end def setup_dead_lettering dlx = session.channel.topic('basquiat.dlx') queue = session.channel.queue('basquiat.dlq', arguments: { 'x-dead-letter-exchange' => session.exchange.name, 'x-message-ttl' => options[:dlx][:ttl] }) queue.bind(dlx, routing_key: '#') end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems