lib/amq/client/async/extensions/rabbitmq/confirm.rb in amq-client-1.0.4 vs lib/amq/client/async/extensions/rabbitmq/confirm.rb in amq-client-1.1.0.pre1
- old
+ new
@@ -3,228 +3,12 @@
module AMQ
module Client
module Async
module Extensions
module RabbitMQ
- # h2. Purpose
- # In case that the broker crashes, some messages can get lost.
- # Thanks to this extension, broker sends Basic.Ack when the message
- # is processed by the broker. In case of persistent messages, it must
- # be written to disk or ack'd on all the queues it was delivered to.
- # However it doesn't have to be necessarily 1:1, because the broker
- # can send Basic.Ack with multi flag to acknowledge multiple messages.
- #
- # So it provides clients a lightweight way of keeping track of which
- # messages have been processed by the broker and which would need
- # re-publishing in case of broker shutdown or network failure.
- #
- # Transactions are solving the same problem, but they are very slow:
- # confirmations are more than 100 times faster.
- #
- # h2. Workflow
- # * Client asks broker to confirm messages on given channel (Confirm.Select).
- # * Broker sends back Confirm.Select-Ok, unless we sent Confirm.Select with nowait=true.
- # * After each published message, the client receives Basic.Ack from the broker.
- # * If something bad happens inside the broker, it sends Basic.Nack.
- #
- # h2. Gotchas
- # Note that we don't keep track of messages awaiting confirmation.
- # It'd add a huge overhead and it's impossible to come up with one-suits-all solution.
- # If you want to create such module, you'll probably want to redefine Channel#after_publish,
- # so it will put messages into a queue and then handlers for Basic.Ack and Basic.Nack.
- # This is the reason why we pass every argument from Exchange#publish to Channel#after_publish.
- # You should not forget though, that both of these methods can have multi flag!
- #
- # Transactional channel cannot be put into confirm mode and a confirm
- # mode channel cannot be made transactional.
- #
- # If the connection between the publisher and broker drops with outstanding
- # confirms, it does not necessarily mean that the messages were lost, so
- # republishing may result in duplicate messages.
-
- # h2. Learn more
- # @see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms
- # @see http://www.rabbitmq.com/amqp-0-9-1-quickref.html#class.confirm
- # @see http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.ack
module Confirm
- module ChannelMixin
-
- # Change publisher index. Publisher index is incremented
- # by 1 after each Basic.Publish starting at 1. This is done
- # on both client and server, hence this acknowledged messages
- # can be matched via its delivery-tag.
- #
- # @api private
- attr_writer :publisher_index
-
- # Publisher index is an index of the last message since
- # the confirmations were activated, started with 0. It's
- # incremented by 1 every time a message is published.
- # This is done on both client and server, hence this
- # acknowledged messages can be matched via its delivery-tag.
- #
- # @return [Integer] Current publisher index.
- # @api public
- def publisher_index
- @publisher_index ||= 0
- end
-
- # Resets publisher index to 0
- #
- # @api plugin
- def reset_publisher_index!
- @publisher_index = 0
- end
-
-
- # This method is executed after publishing of each message via {Exchage#publish}.
- # Currently it just increments publisher index by 1, so messages
- # can be actually matched.
- #
- # @api plugin
- def increment_publisher_index!
- @publisher_index += 1
- end
-
- # Turn on confirmations for this channel and, if given,
- # register callback for Confirm.Select-Ok.
- #
- # @raise [RuntimeError] Occurs when confirmations are already activated.
- # @raise [RuntimeError] Occurs when nowait is true and block is given.
- #
- # @param [Boolean] nowait Whether we expect Confirm.Select-Ok to be returned by the broker or not.
- # @yield [method] Callback which will be executed once we receive Confirm.Select-Ok.
- # @yieldparam [AMQ::Protocol::Confirm::SelectOk] method Protocol method class instance.
- #
- # @return [self] self.
- #
- # @see #confirm
- def confirm_select(nowait = false, &block)
- if nowait && block
- raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense"
- end
-
- @uses_publisher_confirmations = true
- reset_publisher_index!
-
- self.redefine_callback(:confirm_select, &block) unless nowait
- self.redefine_callback(:after_publish) do
- increment_publisher_index!
- end
- @connection.send_frame(Protocol::Confirm::Select.encode(@id, nowait))
-
- self
- end
-
- # @return [Boolean]
- def uses_publisher_confirmations?
- @uses_publisher_confirmations
- end # uses_publisher_confirmations?
-
-
- # Turn on confirmations for this channel and, if given,
- # register callback for basic.ack from the broker.
- #
- # @raise [RuntimeError] Occurs when confirmations are already activated.
- # @raise [RuntimeError] Occurs when nowait is true and block is given.
- # @param [Boolean] nowait Whether we expect Confirm.Select-Ok to be returned by the broker or not.
- #
- # @yield [basick_ack] Callback which will be executed every time we receive Basic.Ack from the broker.
- # @yieldparam [AMQ::Protocol::Basic::Ack] basick_ack Protocol method class instance.
- #
- # @return [self] self.
- def on_ack(nowait = false, &block)
- self.use_publisher_confirmations! unless self.uses_publisher_confirmations?
-
- self.define_callback(:ack, &block) if block
-
- self
- end
-
-
- # Register error callback for Basic.Nack. It's called
- # when message(s) is rejected.
- #
- # @return [self] self
- def on_nack(&block)
- self.define_callback(:nack, &block) if block
-
- self
- end
-
-
-
-
- # Handler for Confirm.Select-Ok. By default, it just
- # executes hook specified via the #confirmations method
- # with a single argument, a protocol method class
- # instance (an instance of AMQ::Protocol::Confirm::SelectOk)
- # and then it deletes the callback, since Confirm.Select
- # is supposed to be sent just once.
- #
- # @api plugin
- def handle_select_ok(method)
- self.exec_callback_once(:confirm_select, method)
- end
-
- # Handler for Basic.Ack. By default, it just
- # executes hook specified via the #confirm method
- # with a single argument, a protocol method class
- # instance (an instance of AMQ::Protocol::Basic::Ack).
- #
- # @api plugin
- def handle_basic_ack(method)
- self.exec_callback(:ack, method)
- end
-
-
- # Handler for Basic.Nack. By default, it just
- # executes hook specified via the #confirm_failed method
- # with a single argument, a protocol method class
- # instance (an instance of AMQ::Protocol::Basic::Nack).
- #
- # @api plugin
- def handle_basic_nack(method)
- self.exec_callback(:nack, method)
- end
-
-
- def reset_state!
- super
-
- @uses_publisher_confirmations = false
- end
-
-
- def self.included(host)
- host.handle(Protocol::Confirm::SelectOk) do |connection, frame|
- method = frame.decode_payload
- channel = connection.channels[frame.channel]
- channel.handle_select_ok(method)
- end
-
- host.handle(Protocol::Basic::Ack) do |connection, frame|
- method = frame.decode_payload
- channel = connection.channels[frame.channel]
- channel.handle_basic_ack(method)
- end
-
- host.handle(Protocol::Basic::Nack) do |connection, frame|
- method = frame.decode_payload
- channel = connection.channels[frame.channel]
- channel.handle_basic_nack(method)
- end
- end # self.included(host)
- end # ChannelMixin
end # Confirm
end # RabbitMQ
end # Extensions
-
-
- class Channel
- # use modules, a native Ruby way of extension of existing classes,
- # instead of reckless monkey-patching. MK.
- include Extensions::RabbitMQ::Confirm::ChannelMixin
- end # Channel
end # Async
end # Client
end # AMQ