lib/amq/client/async/extensions/rabbitmq/confirm.rb in amq-client-0.9.0 vs lib/amq/client/async/extensions/rabbitmq/confirm.rb in amq-client-0.9.1
- old
+ new
@@ -55,19 +55,19 @@
#
# @api private
attr_writer :publisher_index
# Publisher index is an index of the last message since
- # the confirmations were activated, started with 1. It's
- # incremented by 1 after each Basic.Publish starting at 1.
+ # the confirmations were activated, started with 0. It's
+ # incremented by 1 every time a message is published.
# This is done on both client and server, hence this
# acknowledged messages can be matched via its delivery-tag.
#
# @return [Integer] Current publisher index.
# @api public
def publisher_index
- @publisher_index ||= 1
+ @publisher_index ||= 0
end
# Resets publisher index to 0
#
# @api plugin
@@ -79,12 +79,12 @@
# This method is executed after publishing of each message via {Exchage#publish}.
# Currently it just increments publisher index by 1, so messages
# can be actually matched.
#
# @api plugin
- def after_publish(*args)
- self.publisher_index += 1
+ def increment_publisher_index!
+ @publisher_index += 1
end
# Turn on confirmations for this channel and, if given,
# register callback for Confirm.Select-Ok.
#
@@ -102,11 +102,16 @@
if nowait && block
raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense"
end
@uses_publisher_confirmations = true
+ reset_publisher_index!
+
self.redefine_callback(:confirm_select, &block) unless nowait
+ self.redefine_callback(:after_publish) do
+ increment_publisher_index!
+ end
@connection.send_frame(Protocol::Confirm::Select.encode(@id, nowait))
self
end
@@ -208,41 +213,18 @@
channel = connection.channels[frame.channel]
channel.handle_basic_nack(method)
end
end # self.included(host)
end # ChannelMixin
-
-
- module ExchangeMixin
- # Publish message and then run #after_publish on channel belonging
- # to the exchange. This is used for incrementing the publisher index.
- #
- # @api public
- # @see AMQ::Client::Exchange#publish
- # @see AMQ::Client::Extensions::RabbitMQ::Channel#publisher_index
- # @return [self] self
- def publish(*args, &block)
- super(*args)
- @channel.after_publish(*args, &block)
-
- self
- end # publish
- end # ExchangeMixin
end # Confirm
end # RabbitMQ
end # Extensions
class Channel
# use modules, a native Ruby way of extension of existing classes,
# instead of reckless monkey-patching. MK.
include Extensions::RabbitMQ::Confirm::ChannelMixin
end # Channel
-
- class Exchange
- # use modules, a native Ruby way of extension of existing classes,
- # instead of reckless monkey-patching. MK.
- include Extensions::RabbitMQ::Confirm::ExchangeMixin
- end # Exchange
end # Async
end # Client
end # AMQ