lib/bunny/channel.rb in bunny-0.9.0.pre5 vs lib/bunny/channel.rb in bunny-0.9.0.pre6
- old
+ new
@@ -16,11 +16,11 @@
#
# API
#
attr_accessor :id, :connection, :status, :work_pool
- attr_reader :next_publish_seq_no, :queues, :exchanges
+ attr_reader :next_publish_seq_no, :queues, :exchanges, :unconfirmed_set, :consumers
def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
@connection = connection
@id = id || @connection.next_channel_id
@@ -213,10 +213,12 @@
Bunny::Timer.timeout(1, ClientTimeout) do
@last_basic_qos_ok = @continuations.pop
end
raise_if_continuation_resulted_in_a_channel_error!
+ @prefetch_count = prefetch_count
+
@last_basic_qos_ok
end
def basic_recover(requeue)
raise_if_no_longer_open!
@@ -570,19 +572,21 @@
@last_tx_rollback_ok
end
# confirm.*
- def confirm_select
+ def confirm_select(callback=nil)
raise_if_no_longer_open!
if @next_publish_seq_no == 0
- @confirms_continuations = []
+ @confirms_continuations = ::Queue.new
@unconfirmed_set = Set.new
@next_publish_seq_no = 1
end
+ @confirms_callback = callback
+
@connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false))
Bunny::Timer.timeout(1, ClientTimeout) do
@last_confirm_select_ok = @continuations.pop
end
raise_if_continuation_resulted_in_a_channel_error!
@@ -596,10 +600,51 @@
@only_acks_received
end
#
+ # Recovery
+ #
+
+ def recover_from_network_failure
+ # puts "Recovering channel #{@id} from network failure..."
+ recover_prefetch_setting
+ recover_exchanges
+ # this includes recovering bindings
+ recover_queues
+ recover_consumers
+ end
+
+ def recover_prefetch_setting
+ basic_qos(@prefetch_count) if @prefetch_count
+ end
+
+ def recover_exchanges
+ @exchanges.values.dup.each do |x|
+ x.recover_from_network_failure
+ end
+ end
+
+ def recover_queues
+ @queues.values.dup.each do |q|
+ q.recover_from_network_failure
+ end
+ end
+
+ def recover_consumers
+ unless @consumers.empty?
+ @work_pool = ConsumerWorkPool.new(@work_pool.size)
+ @work_pool.start
+ end
+ @consumers.values.dup.each do |c|
+ c.recover_from_network_failure
+ end
+ end
+
+
+
+ #
# Implementation
#
def register_consumer(consumer_tag, consumer)
@consumer_mutex.synchronize do
@@ -658,14 +703,12 @@
when AMQ::Protocol::Tx::SelectOk then
@continuations.push(method)
when AMQ::Protocol::Confirm::SelectOk then
@continuations.push(method)
when AMQ::Protocol::Basic::Ack then
- # TODO: implement confirm listeners
handle_ack_or_nack(method.delivery_tag, method.multiple, false)
when AMQ::Protocol::Basic::Nack then
- # TODO: implement confirm listeners
handle_ack_or_nack(method.delivery_tag, method.multiple, true)
when AMQ::Protocol::Channel::Close then
# puts "Exception on channel #{@id}: #{method.reply_code} #{method.reply_text}"
closed!
@connection.send_frame(AMQ::Protocol::Channel::CloseOk.encode(@id))
@@ -709,50 +752,64 @@
end
end
def handle_ack_or_nack(delivery_tag, multiple, nack)
if multiple
- @unconfirmed_set.delete_if { |i| i < delivery_tag }
+ @unconfirmed_set.delete_if { |i| i <= delivery_tag }
else
@unconfirmed_set.delete(delivery_tag)
end
@unconfirmed_set_mutex.synchronize do
@only_acks_received = (@only_acks_received && !nack)
@confirms_continuations.push(true) if @unconfirmed_set.empty?
+
+ @confirms_callback.call(delivery_tag, multiple, nack) if @confirms_callback
end
end
# Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads
# that won't do any real work for channels that do not register consumers (e.g. only used for
# publishing). MK.
def maybe_start_consumer_work_pool!
@work_pool.start unless @work_pool.started?
end
+ def maybe_pause_consumer_work_pool!
+ @work_pool.pause if @work_pool && @work_pool.started?
+ end
+
+ def maybe_kill_consumer_work_pool!
+ @work_pool.kill if @work_pool && @work_pool.started?
+ end
+
def read_next_frame(options = {})
@connection.read_next_frame(options = {})
end
# Synchronizes given block using this channel's mutex.
# @api public
def synchronize(&block)
@publishing_mutex.synchronize(&block)
end
-
+
def deregister_queue(queue)
@queues.delete(queue.name)
end
+ def deregister_queue_named(name)
+ @queues.delete(name)
+ end
+
def register_queue(queue)
@queues[queue.name] = queue
end
def find_queue(name)
@queues[name]
end
-
+
def deregister_exchange(exchange)
@exchanges.delete(exchange.name)
end
def register_exchange(exchange)