Sha256: 563d279190a07cfb246f11bc7359c94b8375770f2a15264554df2150605498b6

Contents?: true

Size: 1.04 KB

Versions: 3

Compression:

Stored size: 1.04 KB

Contents

module Outboxable
  module RabbitMq
    class Publisher
      def initialize(resource:)
        @resource = resource
      end
    
      def to_envelope(resource:)
        # throw not implemented method error
        raise NotImplementedError, "Please implement the to_envelope method in your own module" 
      end
    
      def publish
        confirmed = nil
    
        Outboxable::Connection.instance.channel.with do |channel|
          channel.confirm_select
    
          # Declare a exchange
          exchange = channel.topic(@resource.exchange, durable: true)
    
          # Publish the CloudEvent resource to the exchange
          exchange.publish(to_envelope(resource: @resource), routing_key: @resource.routing_key, headers: @resource.try(:headers) || {})
    
          # Wait for confirmation
          confirmed = channel.wait_for_confirms
        end
    
        return unless confirmed
    
        @resource.reload
        @resource.increment_attempt
        @resource.update(status: :published, retry_at: nil)
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
outboxable-0.1.2 lib/outboxable/rabbitmq/publisher.rb
outboxable-0.1.1 lib/outboxable/rabbitmq/publisher.rb
outboxable-0.1.0 lib/outboxable/rabbitmq/publisher.rb