Sha256: 6008a93c6f6a2d16855ada16ac3fdc373fdc818adc32155ef14b3fa2338b13d8

Contents?: true

Size: 1.01 KB

Versions: 1

Compression:

Stored size: 1.01 KB

Contents

module Outboxable
  module RabbitMq
    class Publisher
      def initialize(resource:)
        @resource = resource
      end

      def to_envelope(resource:)
        # throw not implemented method error
        raise NotImplementedError, 'Please implement the to_envelope method in your own module'
      end

      def publish
        confirmed = nil

        Outboxable::Connection.instance.channel.with do |channel|
          channel.confirm_select

          # Declare a exchange
          exchange = channel.topic(@resource.exchange, durable: true)

          # Publish the CloudEvent resource to the exchange
          exchange.publish(to_envelope(resource: @resource), routing_key: @resource.routing_key, headers: @resource.try(:headers) || {})

          # Wait for confirmation
          confirmed = channel.wait_for_confirms
        end

        return unless confirmed

        @resource.reload
        @resource.increment_attempt
        @resource.update(status: :published, retry_at: nil)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
outboxable-0.1.3 lib/outboxable/rabbitmq/publisher.rb