lib/bunny/channel.rb in bunny-0.9.8 vs lib/bunny/channel.rb in bunny-0.10.0

- old
+ new

@@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- require "thread" require "monitor" require "set" +require "bunny/concurrent/atomic_fixnum" require "bunny/consumer_work_pool" require "bunny/exchange" require "bunny/queue" @@ -187,12 +188,16 @@ @threads_waiting_on_continuations = Set.new @threads_waiting_on_confirms_continuations = Set.new @threads_waiting_on_basic_get_continuations = Set.new @next_publish_seq_no = 0 + + @recoveries_counter = Bunny::Concurrent::AtomicFixnum.new(0) end + attr_reader :recoveries_counter + # @private def read_write_timeout @connection.read_write_timeout end @@ -436,22 +441,26 @@ # @see Bunny::Channel#ack # @see Bunny::Channel#nack # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide # @api public def reject(delivery_tag, requeue = false) - basic_reject(delivery_tag, requeue) + guarding_against_stale_delivery_tags(delivery_tag) do + basic_reject(delivery_tag.to_i, requeue) + end end # Acknowledges a message. Acknowledged messages are completely removed from the queue. # # @param [Integer] delivery_tag Delivery tag to acknowledge # @param [Boolean] multiple (false) Should all unacknowledged messages up to this be acknowledged as well? # @see Bunny::Channel#nack # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide # @api public def ack(delivery_tag, multiple = false) - basic_ack(delivery_tag, multiple) + guarding_against_stale_delivery_tags(delivery_tag) do + basic_ack(delivery_tag.to_i, multiple) + end end alias acknowledge ack # Rejects a message. A rejected message can be requeued or # dropped by RabbitMQ. This method is similar to {Bunny::Channel#reject} but @@ -462,11 +471,13 @@ # @param [Boolean] requeue (false) Should this message be requeued instead of dropping it? # @see Bunny::Channel#ack # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide # @api public def nack(delivery_tag, multiple = false, requeue = false) - basic_nack(delivery_tag, multiple, requeue) + guarding_against_stale_delivery_tags(delivery_tag) do + basic_nack(delivery_tag.to_i, multiple, requeue) + end end # @endgroup # @@ -699,10 +710,11 @@ # delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :ack => true) # # ack all fetched messages up to payload3 # ch.basic_ack(delivery_info.delivery_tag, true) # # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide + # @see #basic_ack_known_delivery_tag # @api public def basic_ack(delivery_tag, multiple) raise_if_no_longer_open! @connection.send_frame(AMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple)) @@ -1405,10 +1417,11 @@ recover_prefetch_setting recover_exchanges # this includes recovering bindings recover_queues recover_consumers + increment_recoveries_counter end # Recovers basic.qos setting. Used by the Automatic Network Failure # Recovery feature. # @@ -1450,10 +1463,15 @@ @consumers.values.dup.each do |c| c.recover_from_network_failure end end + # @private + def increment_recoveries_counter + @recoveries_counter.increment + end + # @endgroup # @return [String] Brief human-readable representation of the channel def to_s @@ -1561,10 +1579,11 @@ method.reply_code == 406 && method.reply_text =~ /unknown delivery tag/ end # @private def handle_basic_get_ok(basic_get_ok, properties, content) + basic_get_ok.delivery_tag = VersionedDeliveryTag.new(basic_get_ok.delivery_tag, @recoveries_counter.get) @basic_get_continuations.push([basic_get_ok, properties, content]) end # @private def handle_basic_get_empty(basic_get_empty) @@ -1612,14 +1631,12 @@ @unconfirmed_set.delete(delivery_tag) end @unconfirmed_set_mutex.synchronize do @only_acks_received = (@only_acks_received && !nack) - @logger.debug "Channel #{@id}: @only_acks_received = #{@only_acks_received.inspect}, nack: #{nack.inspect}" @confirms_continuations.push(true) if @unconfirmed_set.empty? - @confirms_callback.call(delivery_tag, multiple, nack) if @confirms_callback end end # @private @@ -1824,7 +1841,23 @@ # @private def new_continuation Concurrent::ContinuationQueue.new end end # if defined? + + # @private + def guarding_against_stale_delivery_tags(tag, &block) + case tag + # if a fixnum was passed, execute unconditionally. MK. + when Fixnum then + block.call + # versioned delivery tags should be checked to avoid + # sending out stale (invalid) tags after channel was reopened + # during network failure recovery. MK. + when VersionedDeliveryTag then + if !tag.stale?(@recoveries_counter.get) + block.call + end + end + end end # Channel end # Bunny