lib/amq/client/async/channel.rb in amq-client-1.0.4 vs lib/amq/client/async/channel.rb in amq-client-1.1.0.pre1

- old
+ new

@@ -33,11 +33,19 @@ attr_reader :queues_awaiting_declare_ok, :queues_awaiting_delete_ok, :queues_awaiting_bind_ok, :queues_awaiting_unbind_ok, :queues_awaiting_purge_ok, :queues_awaiting_get_response attr_reader :consumers_awaiting_consume_ok, :consumers_awaiting_cancel_ok attr_accessor :flow_is_active + # Change publisher index. Publisher index is incremented + # by 1 after each Basic.Publish starting at 1. This is done + # on both client and server, hence this acknowledged messages + # can be matched via its delivery-tag. + # + # @api private + attr_writer :publisher_index + def initialize(connection, id, options = {}) super(connection) @id = id @exchanges = Hash.new @@ -138,19 +146,24 @@ @connection.send_frame(Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple)) self end # acknowledge(delivery_tag, multiple = false) - # Reject a message with given delivery tag. + # Reject a message with given delivery tag. When rejecting multiple messages at once, + # uses back.nack instead of basic.reject. # # @api public - # @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol reference (Section 1.8.3.14.) - def reject(delivery_tag, requeue = true) - @connection.send_frame(Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue)) + # @see http://www.rabbitmq.com/amqp-0-9-1-quickref.html#basic.nack + def reject(delivery_tag, requeue = true, multi = false) + if multi + @connection.send_frame(Protocol::Basic::Nack.encode(self.id, delivery_tag, multi, requeue)) + else + @connection.send_frame(Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue)) + end self - end # reject(delivery_tag, requeue = true) + end # reject # Notifies AMQ broker that consumer has recovered and unacknowledged messages need # to be redelivered. # # @return [Channel] self @@ -330,11 +343,144 @@ end # auto_recover # @endgroup + # Publisher index is an index of the last message since + # the confirmations were activated, started with 0. It's + # incremented by 1 every time a message is published. + # This is done on both client and server, hence this + # acknowledged messages can be matched via its delivery-tag. # + # @return [Integer] Current publisher index. + # @api public + def publisher_index + @publisher_index ||= 0 + end + + # Resets publisher index to 0 + # + # @api plugin + def reset_publisher_index! + @publisher_index = 0 + end + + + # This method is executed after publishing of each message via {Exchage#publish}. + # Currently it just increments publisher index by 1, so messages + # can be actually matched. + # + # @api plugin + def increment_publisher_index! + @publisher_index += 1 + end + + # Turn on confirmations for this channel and, if given, + # register callback for Confirm.Select-Ok. + # + # @raise [RuntimeError] Occurs when confirmations are already activated. + # @raise [RuntimeError] Occurs when nowait is true and block is given. + # + # @param [Boolean] nowait Whether we expect Confirm.Select-Ok to be returned by the broker or not. + # @yield [method] Callback which will be executed once we receive Confirm.Select-Ok. + # @yieldparam [AMQ::Protocol::Confirm::SelectOk] method Protocol method class instance. + # + # @return [self] self. + # + # @see #confirm + def confirm_select(nowait = false, &block) + if nowait && block + raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense" + end + + @uses_publisher_confirmations = true + reset_publisher_index! + + self.redefine_callback(:confirm_select, &block) unless nowait + self.redefine_callback(:after_publish) do + increment_publisher_index! + end + @connection.send_frame(Protocol::Confirm::Select.encode(@id, nowait)) + + self + end + + # @return [Boolean] + def uses_publisher_confirmations? + @uses_publisher_confirmations + end # uses_publisher_confirmations? + + + # Turn on confirmations for this channel and, if given, + # register callback for basic.ack from the broker. + # + # @raise [RuntimeError] Occurs when confirmations are already activated. + # @raise [RuntimeError] Occurs when nowait is true and block is given. + # @param [Boolean] nowait Whether we expect Confirm.Select-Ok to be returned by the broker or not. + # + # @yield [basick_ack] Callback which will be executed every time we receive Basic.Ack from the broker. + # @yieldparam [AMQ::Protocol::Basic::Ack] basick_ack Protocol method class instance. + # + # @return [self] self. + def on_ack(nowait = false, &block) + self.use_publisher_confirmations! unless self.uses_publisher_confirmations? + + self.define_callback(:ack, &block) if block + + self + end + + + # Register error callback for Basic.Nack. It's called + # when message(s) is rejected. + # + # @return [self] self + def on_nack(&block) + self.define_callback(:nack, &block) if block + + self + end + + + + + # Handler for Confirm.Select-Ok. By default, it just + # executes hook specified via the #confirmations method + # with a single argument, a protocol method class + # instance (an instance of AMQ::Protocol::Confirm::SelectOk) + # and then it deletes the callback, since Confirm.Select + # is supposed to be sent just once. + # + # @api plugin + def handle_select_ok(method) + self.exec_callback_once(:confirm_select, method) + end + + # Handler for Basic.Ack. By default, it just + # executes hook specified via the #confirm method + # with a single argument, a protocol method class + # instance (an instance of AMQ::Protocol::Basic::Ack). + # + # @api plugin + def handle_basic_ack(method) + self.exec_callback(:ack, method) + end + + + # Handler for Basic.Nack. By default, it just + # executes hook specified via the #confirm_failed method + # with a single argument, a protocol method class + # instance (an instance of AMQ::Protocol::Basic::Nack). + # + # @api plugin + def handle_basic_nack(method) + self.exec_callback(:nack, method) + end + + + + # # Implementation # def register_exchange(exchange) raise ArgumentError, "argument is nil!" if exchange.nil? @@ -388,10 +534,11 @@ @consumers_awaiting_cancel_ok = Array.new @queues_awaiting_get_response = Array.new @callbacks = @callbacks.delete_if { |k, v| !RECOVERY_EVENTS.include?(k) } + @uses_publisher_confirmations = false end # reset_state! # @api plugin # @private @@ -470,9 +617,27 @@ end self.handle(Protocol::Tx::RollbackOk) do |connection, frame| channel = connection.channels[frame.channel] channel.exec_callback(:tx_rollback, frame.decode_payload) + end + + self.handle(Protocol::Confirm::SelectOk) do |connection, frame| + method = frame.decode_payload + channel = connection.channels[frame.channel] + channel.handle_select_ok(method) + end + + self.handle(Protocol::Basic::Ack) do |connection, frame| + method = frame.decode_payload + channel = connection.channels[frame.channel] + channel.handle_basic_ack(method) + end + + self.handle(Protocol::Basic::Nack) do |connection, frame| + method = frame.decode_payload + channel = connection.channels[frame.channel] + channel.handle_basic_nack(method) end end # Channel end # Async end # Client end # AMQ