lib/bunny/channel.rb in bunny-0.9.8 vs lib/bunny/channel.rb in bunny-0.10.0
- old
+ new
@@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
require "thread"
require "monitor"
require "set"
+require "bunny/concurrent/atomic_fixnum"
require "bunny/consumer_work_pool"
require "bunny/exchange"
require "bunny/queue"
@@ -187,12 +188,16 @@
@threads_waiting_on_continuations = Set.new
@threads_waiting_on_confirms_continuations = Set.new
@threads_waiting_on_basic_get_continuations = Set.new
@next_publish_seq_no = 0
+
+ @recoveries_counter = Bunny::Concurrent::AtomicFixnum.new(0)
end
+ attr_reader :recoveries_counter
+
# @private
def read_write_timeout
@connection.read_write_timeout
end
@@ -436,22 +441,26 @@
# @see Bunny::Channel#ack
# @see Bunny::Channel#nack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def reject(delivery_tag, requeue = false)
- basic_reject(delivery_tag, requeue)
+ guarding_against_stale_delivery_tags(delivery_tag) do
+ basic_reject(delivery_tag.to_i, requeue)
+ end
end
# Acknowledges a message. Acknowledged messages are completely removed from the queue.
#
# @param [Integer] delivery_tag Delivery tag to acknowledge
# @param [Boolean] multiple (false) Should all unacknowledged messages up to this be acknowledged as well?
# @see Bunny::Channel#nack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def ack(delivery_tag, multiple = false)
- basic_ack(delivery_tag, multiple)
+ guarding_against_stale_delivery_tags(delivery_tag) do
+ basic_ack(delivery_tag.to_i, multiple)
+ end
end
alias acknowledge ack
# Rejects a message. A rejected message can be requeued or
# dropped by RabbitMQ. This method is similar to {Bunny::Channel#reject} but
@@ -462,11 +471,13 @@
# @param [Boolean] requeue (false) Should this message be requeued instead of dropping it?
# @see Bunny::Channel#ack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def nack(delivery_tag, multiple = false, requeue = false)
- basic_nack(delivery_tag, multiple, requeue)
+ guarding_against_stale_delivery_tags(delivery_tag) do
+ basic_nack(delivery_tag.to_i, multiple, requeue)
+ end
end
# @endgroup
#
@@ -699,10 +710,11 @@
# delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :ack => true)
# # ack all fetched messages up to payload3
# ch.basic_ack(delivery_info.delivery_tag, true)
#
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @see #basic_ack_known_delivery_tag
# @api public
def basic_ack(delivery_tag, multiple)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple))
@@ -1405,10 +1417,11 @@
recover_prefetch_setting
recover_exchanges
# this includes recovering bindings
recover_queues
recover_consumers
+ increment_recoveries_counter
end
# Recovers basic.qos setting. Used by the Automatic Network Failure
# Recovery feature.
#
@@ -1450,10 +1463,15 @@
@consumers.values.dup.each do |c|
c.recover_from_network_failure
end
end
+ # @private
+ def increment_recoveries_counter
+ @recoveries_counter.increment
+ end
+
# @endgroup
# @return [String] Brief human-readable representation of the channel
def to_s
@@ -1561,10 +1579,11 @@
method.reply_code == 406 && method.reply_text =~ /unknown delivery tag/
end
# @private
def handle_basic_get_ok(basic_get_ok, properties, content)
+ basic_get_ok.delivery_tag = VersionedDeliveryTag.new(basic_get_ok.delivery_tag, @recoveries_counter.get)
@basic_get_continuations.push([basic_get_ok, properties, content])
end
# @private
def handle_basic_get_empty(basic_get_empty)
@@ -1612,14 +1631,12 @@
@unconfirmed_set.delete(delivery_tag)
end
@unconfirmed_set_mutex.synchronize do
@only_acks_received = (@only_acks_received && !nack)
- @logger.debug "Channel #{@id}: @only_acks_received = #{@only_acks_received.inspect}, nack: #{nack.inspect}"
@confirms_continuations.push(true) if @unconfirmed_set.empty?
-
@confirms_callback.call(delivery_tag, multiple, nack) if @confirms_callback
end
end
# @private
@@ -1824,7 +1841,23 @@
# @private
def new_continuation
Concurrent::ContinuationQueue.new
end
end # if defined?
+
+ # @private
+ def guarding_against_stale_delivery_tags(tag, &block)
+ case tag
+ # if a fixnum was passed, execute unconditionally. MK.
+ when Fixnum then
+ block.call
+ # versioned delivery tags should be checked to avoid
+ # sending out stale (invalid) tags after channel was reopened
+ # during network failure recovery. MK.
+ when VersionedDeliveryTag then
+ if !tag.stale?(@recoveries_counter.get)
+ block.call
+ end
+ end
+ end
end # Channel
end # Bunny