Sha256: 978820b53ffe2eceec8daf4b846a87ad48023bbaeffa0bc6f764d20a376aeaab

Contents?: true

Size: 1.56 KB

Versions: 4

Compression:

Stored size: 1.56 KB

Contents

module RabbitFeed
  class ProducerConnection < RabbitFeed::Connection
    PUBLISH_OPTIONS = {
      persistent: true, # Persist the message to disk
      mandatory:  true, # Return the message if it can't be routed to a queue
    }.freeze

    EXCHANGE_OPTIONS = {
      type:        :topic, # Allow wildcard routing keys
      durable:     true,   # Persist across server restart
      no_declare:  false,  # Create the exchange if it does not exist
    }.freeze

    def self.handle_returned_message(return_info, _content)
      RabbitFeed.log.error { { event: :returned_message, return_info: return_info } }
      RabbitFeed.exception_notify(ReturnedMessageError.new(return_info))
    end

    def initialize
      super
      @exchange = channel.exchange RabbitFeed.configuration.exchange, exchange_options
      RabbitFeed.log.info { { event: :exchange_declared, exchange: RabbitFeed.configuration.exchange, options: exchange_options } }
      exchange.on_return do |return_info, _properties, content|
        RabbitFeed::ProducerConnection.handle_returned_message return_info, content
      end
    end

    def publish(message, options)
      synchronized do
        bunny_options = (options.merge PUBLISH_OPTIONS)
        RabbitFeed.log.debug { { event: :publish, options: options, exchange: RabbitFeed.configuration.exchange } }
        exchange.publish message, bunny_options
      end
    end

    private

    attr_reader :exchange

    def exchange_options
      {
        auto_delete: RabbitFeed.configuration.auto_delete_exchange
      }.merge EXCHANGE_OPTIONS
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
rabbit_feed-3.0.3 lib/rabbit_feed/producer_connection.rb
rabbit_feed-3.0.2 lib/rabbit_feed/producer_connection.rb
rabbit_feed-3.0.1 lib/rabbit_feed/producer_connection.rb
rabbit_feed-3.0.0 lib/rabbit_feed/producer_connection.rb