Sha256: e3c6f548bfba91fc7c6e6886b336433f626b7452c6b1b12519913484ce7fd7b5

Contents?: true

Size: 1.36 KB

Versions: 5

Compression:

Stored size: 1.36 KB

Contents

module Fluffle
  class Confirmer
    attr_reader :channel

    def initialize(channel:)
      @channel = channel

      @pending_confirms = Concurrent::Map.new
    end

    # Enables confirms on the channel and sets up callback to receive and
    # unblock corresponding `with_confirmation` call.
    def confirm_select
      handle_confirm = ->(tag, _multiple, nack) do
        ivar = @pending_confirms.delete tag

        if ivar
          ivar.set nack
        else
          self.logger.error "Missing confirm IVar: tag=#{tag}"
        end
      end

      # Set the channel in confirmation mode so that we can receive confirms
      # of published messages
      @channel.confirm_select handle_confirm
    end

    # Wraps a block (which should publish a message) with a blocking check
    # that it received a confirmation from the RabbitMQ server that the
    # message that was received and routed successfully.
    def with_confirmation(timeout:)
      tag = @channel.next_publish_seq_no
      confirm_ivar = Concurrent::IVar.new
      @pending_confirms[tag] = confirm_ivar

      result = yield

      nack = confirm_ivar.value timeout
      if confirm_ivar.incomplete?
        raise Errors::ConfirmTimeoutError.new('Timed out waiting for confirm')
      elsif nack
        raise Errors::NackError.new('Received nack from confirmation')
      end

      result
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
fluffle-1.1.0 lib/fluffle/confirmer.rb
fluffle-1.0.1 lib/fluffle/confirmer.rb
fluffle-1.0.0 lib/fluffle/confirmer.rb
fluffle-0.9.1 lib/fluffle/confirmer.rb
fluffle-0.9.0 lib/fluffle/confirmer.rb