Sha256: 0370e21a6d22ab0e5c37daa784fe07350592bccf07bb830df1a71ec4fed1ddff

Contents?: true

Size: 1.47 KB

Versions: 3

Compression:

Stored size: 1.47 KB

Contents

# frozen_string_literal: true

module RabbitmqClient
  # ExchangeRegistry is a store for all managed exchanges and their details
  class MessagePublisher
    # Custom error is thrown when rabbitmq do not confirm publishing an event
    class ConfirmationFailed < StandardError
      def initialize(exchange, nacked, unconfirmed)
        msg = 'Message confirmation on the exchange ' \
              "#{exchange} has failed (#{nacked}/#{unconfirmed})."
        super(msg)
      end
    end

    def initialize(data, exchange, channel, options)
      @data = data.to_json
      @exchange = exchange
      @channel = channel
      @options = { headers: {} }.merge(options)
      @options[:headers][:tags] = TagsFilter.tags
    end

    def publish
      exchange = @exchange.create(@channel)

      notify('publishing_message')
      exchange.publish(@data, **@options)
      notify('published_message')
    end

    def wait_for_confirms
      notify('confirming_message')
      if @channel.wait_for_confirms
        notify('message_confirmed')
        return
      end
      raise ConfirmationFailed.new(@exchange.name, @channel.nacked_set,
                                   @channel.unconfirmed_set)
    end

    private

    def notify(event)
      ActiveSupport::Notifications.instrument(
        "#{event}.rabbitmq_client",
        message_payload
      )
    end

    def message_payload
      {
        exchange: @exchange.name,
        message_id: @options[:message_id]
      }
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
rabbitmq_client-0.0.3 lib/rabbitmq_client/message_publisher.rb
rabbitmq_client-0.0.2 lib/rabbitmq_client/message_publisher.rb
rabbitmq_client-0.0.1 lib/rabbitmq_client/message_publisher.rb