Sha256: 0548257d827157bf8200ae315a0bce7e0db6751fc20b417b993b9e9179475d33
Contents?: true
Size: 1.35 KB
Versions: 5
Compression:
Stored size: 1.35 KB
Contents
module Fluffle class Confirmer attr_reader :channel def initialize(channel:) @channel = channel @pending_confirms = Concurrent::Map.new end # Enables confirms on the channel and sets up callback to receive and # unblock corresponding `with_confirmation` call. def confirm_select handle_confirm = ->(tag, _multiple, nack) do ivar = @pending_confirms.delete tag if ivar ivar.set nack else self.logger.error "Missing confirm IVar: tag=#{tag}" end end # Set the channel in confirmation mode so that we can receive confirms # of published messages @channel.confirm_select handle_confirm end # Wraps a block (which should publish a message) with a blocking check # that it received a confirmation from the RabbitMQ server that the # message that was received and routed successfully. def with_confirmation(timeout:) tag = @channel.next_publish_seq_no confirm_ivar = Concurrent::IVar.new @pending_confirms[tag] = confirm_ivar result = yield nack = confirm_ivar.value timeout if confirm_ivar.incomplete? raise Errors::TimeoutError.new('Timed out waiting for confirm') elsif nack raise Errors::NackError.new('Received nack from confirmation') end result end end end
Version data entries
5 entries across 5 versions & 1 rubygems