lib/amq/client/async/extensions/rabbitmq/confirm.rb in amq-client-0.9.0 vs lib/amq/client/async/extensions/rabbitmq/confirm.rb in amq-client-0.9.1

- old
+ new

@@ -55,19 +55,19 @@ # # @api private attr_writer :publisher_index # Publisher index is an index of the last message since - # the confirmations were activated, started with 1. It's - # incremented by 1 after each Basic.Publish starting at 1. + # the confirmations were activated, started with 0. It's + # incremented by 1 every time a message is published. # This is done on both client and server, hence this # acknowledged messages can be matched via its delivery-tag. # # @return [Integer] Current publisher index. # @api public def publisher_index - @publisher_index ||= 1 + @publisher_index ||= 0 end # Resets publisher index to 0 # # @api plugin @@ -79,12 +79,12 @@ # This method is executed after publishing of each message via {Exchage#publish}. # Currently it just increments publisher index by 1, so messages # can be actually matched. # # @api plugin - def after_publish(*args) - self.publisher_index += 1 + def increment_publisher_index! + @publisher_index += 1 end # Turn on confirmations for this channel and, if given, # register callback for Confirm.Select-Ok. # @@ -102,11 +102,16 @@ if nowait && block raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense" end @uses_publisher_confirmations = true + reset_publisher_index! + self.redefine_callback(:confirm_select, &block) unless nowait + self.redefine_callback(:after_publish) do + increment_publisher_index! + end @connection.send_frame(Protocol::Confirm::Select.encode(@id, nowait)) self end @@ -208,41 +213,18 @@ channel = connection.channels[frame.channel] channel.handle_basic_nack(method) end end # self.included(host) end # ChannelMixin - - - module ExchangeMixin - # Publish message and then run #after_publish on channel belonging - # to the exchange. This is used for incrementing the publisher index. - # - # @api public - # @see AMQ::Client::Exchange#publish - # @see AMQ::Client::Extensions::RabbitMQ::Channel#publisher_index - # @return [self] self - def publish(*args, &block) - super(*args) - @channel.after_publish(*args, &block) - - self - end # publish - end # ExchangeMixin end # Confirm end # RabbitMQ end # Extensions class Channel # use modules, a native Ruby way of extension of existing classes, # instead of reckless monkey-patching. MK. include Extensions::RabbitMQ::Confirm::ChannelMixin end # Channel - - class Exchange - # use modules, a native Ruby way of extension of existing classes, - # instead of reckless monkey-patching. MK. - include Extensions::RabbitMQ::Confirm::ExchangeMixin - end # Exchange end # Async end # Client end # AMQ