lib/amq/client/async/channel.rb in amq-client-1.0.4 vs lib/amq/client/async/channel.rb in amq-client-1.1.0.pre1
- old
+ new
@@ -33,11 +33,19 @@
attr_reader :queues_awaiting_declare_ok, :queues_awaiting_delete_ok, :queues_awaiting_bind_ok, :queues_awaiting_unbind_ok, :queues_awaiting_purge_ok, :queues_awaiting_get_response
attr_reader :consumers_awaiting_consume_ok, :consumers_awaiting_cancel_ok
attr_accessor :flow_is_active
+ # Change publisher index. Publisher index is incremented
+ # by 1 after each Basic.Publish starting at 1. This is done
+ # on both client and server, hence this acknowledged messages
+ # can be matched via its delivery-tag.
+ #
+ # @api private
+ attr_writer :publisher_index
+
def initialize(connection, id, options = {})
super(connection)
@id = id
@exchanges = Hash.new
@@ -138,19 +146,24 @@
@connection.send_frame(Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple))
self
end # acknowledge(delivery_tag, multiple = false)
- # Reject a message with given delivery tag.
+ # Reject a message with given delivery tag. When rejecting multiple messages at once,
+ # uses back.nack instead of basic.reject.
#
# @api public
- # @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol reference (Section 1.8.3.14.)
- def reject(delivery_tag, requeue = true)
- @connection.send_frame(Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue))
+ # @see http://www.rabbitmq.com/amqp-0-9-1-quickref.html#basic.nack
+ def reject(delivery_tag, requeue = true, multi = false)
+ if multi
+ @connection.send_frame(Protocol::Basic::Nack.encode(self.id, delivery_tag, multi, requeue))
+ else
+ @connection.send_frame(Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue))
+ end
self
- end # reject(delivery_tag, requeue = true)
+ end # reject
# Notifies AMQ broker that consumer has recovered and unacknowledged messages need
# to be redelivered.
#
# @return [Channel] self
@@ -330,11 +343,144 @@
end # auto_recover
# @endgroup
+ # Publisher index is an index of the last message since
+ # the confirmations were activated, started with 0. It's
+ # incremented by 1 every time a message is published.
+ # This is done on both client and server, hence this
+ # acknowledged messages can be matched via its delivery-tag.
#
+ # @return [Integer] Current publisher index.
+ # @api public
+ def publisher_index
+ @publisher_index ||= 0
+ end
+
+ # Resets publisher index to 0
+ #
+ # @api plugin
+ def reset_publisher_index!
+ @publisher_index = 0
+ end
+
+
+ # This method is executed after publishing of each message via {Exchage#publish}.
+ # Currently it just increments publisher index by 1, so messages
+ # can be actually matched.
+ #
+ # @api plugin
+ def increment_publisher_index!
+ @publisher_index += 1
+ end
+
+ # Turn on confirmations for this channel and, if given,
+ # register callback for Confirm.Select-Ok.
+ #
+ # @raise [RuntimeError] Occurs when confirmations are already activated.
+ # @raise [RuntimeError] Occurs when nowait is true and block is given.
+ #
+ # @param [Boolean] nowait Whether we expect Confirm.Select-Ok to be returned by the broker or not.
+ # @yield [method] Callback which will be executed once we receive Confirm.Select-Ok.
+ # @yieldparam [AMQ::Protocol::Confirm::SelectOk] method Protocol method class instance.
+ #
+ # @return [self] self.
+ #
+ # @see #confirm
+ def confirm_select(nowait = false, &block)
+ if nowait && block
+ raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense"
+ end
+
+ @uses_publisher_confirmations = true
+ reset_publisher_index!
+
+ self.redefine_callback(:confirm_select, &block) unless nowait
+ self.redefine_callback(:after_publish) do
+ increment_publisher_index!
+ end
+ @connection.send_frame(Protocol::Confirm::Select.encode(@id, nowait))
+
+ self
+ end
+
+ # @return [Boolean]
+ def uses_publisher_confirmations?
+ @uses_publisher_confirmations
+ end # uses_publisher_confirmations?
+
+
+ # Turn on confirmations for this channel and, if given,
+ # register callback for basic.ack from the broker.
+ #
+ # @raise [RuntimeError] Occurs when confirmations are already activated.
+ # @raise [RuntimeError] Occurs when nowait is true and block is given.
+ # @param [Boolean] nowait Whether we expect Confirm.Select-Ok to be returned by the broker or not.
+ #
+ # @yield [basick_ack] Callback which will be executed every time we receive Basic.Ack from the broker.
+ # @yieldparam [AMQ::Protocol::Basic::Ack] basick_ack Protocol method class instance.
+ #
+ # @return [self] self.
+ def on_ack(nowait = false, &block)
+ self.use_publisher_confirmations! unless self.uses_publisher_confirmations?
+
+ self.define_callback(:ack, &block) if block
+
+ self
+ end
+
+
+ # Register error callback for Basic.Nack. It's called
+ # when message(s) is rejected.
+ #
+ # @return [self] self
+ def on_nack(&block)
+ self.define_callback(:nack, &block) if block
+
+ self
+ end
+
+
+
+
+ # Handler for Confirm.Select-Ok. By default, it just
+ # executes hook specified via the #confirmations method
+ # with a single argument, a protocol method class
+ # instance (an instance of AMQ::Protocol::Confirm::SelectOk)
+ # and then it deletes the callback, since Confirm.Select
+ # is supposed to be sent just once.
+ #
+ # @api plugin
+ def handle_select_ok(method)
+ self.exec_callback_once(:confirm_select, method)
+ end
+
+ # Handler for Basic.Ack. By default, it just
+ # executes hook specified via the #confirm method
+ # with a single argument, a protocol method class
+ # instance (an instance of AMQ::Protocol::Basic::Ack).
+ #
+ # @api plugin
+ def handle_basic_ack(method)
+ self.exec_callback(:ack, method)
+ end
+
+
+ # Handler for Basic.Nack. By default, it just
+ # executes hook specified via the #confirm_failed method
+ # with a single argument, a protocol method class
+ # instance (an instance of AMQ::Protocol::Basic::Nack).
+ #
+ # @api plugin
+ def handle_basic_nack(method)
+ self.exec_callback(:nack, method)
+ end
+
+
+
+ #
# Implementation
#
def register_exchange(exchange)
raise ArgumentError, "argument is nil!" if exchange.nil?
@@ -388,10 +534,11 @@
@consumers_awaiting_cancel_ok = Array.new
@queues_awaiting_get_response = Array.new
@callbacks = @callbacks.delete_if { |k, v| !RECOVERY_EVENTS.include?(k) }
+ @uses_publisher_confirmations = false
end # reset_state!
# @api plugin
# @private
@@ -470,9 +617,27 @@
end
self.handle(Protocol::Tx::RollbackOk) do |connection, frame|
channel = connection.channels[frame.channel]
channel.exec_callback(:tx_rollback, frame.decode_payload)
+ end
+
+ self.handle(Protocol::Confirm::SelectOk) do |connection, frame|
+ method = frame.decode_payload
+ channel = connection.channels[frame.channel]
+ channel.handle_select_ok(method)
+ end
+
+ self.handle(Protocol::Basic::Ack) do |connection, frame|
+ method = frame.decode_payload
+ channel = connection.channels[frame.channel]
+ channel.handle_basic_ack(method)
+ end
+
+ self.handle(Protocol::Basic::Nack) do |connection, frame|
+ method = frame.decode_payload
+ channel = connection.channels[frame.channel]
+ channel.handle_basic_nack(method)
end
end # Channel
end # Async
end # Client
end # AMQ