lib/bunny/channel.rb in bunny-0.9.0.pre10 vs lib/bunny/channel.rb in bunny-0.9.0.pre11
- old
+ new
@@ -157,10 +157,11 @@
# @param [Bunny::Session] connection AMQP 0.9.1 connection
# @param [Integer] id Channel id, pass nil to make Bunny automatically allocate it
# @param [Bunny::ConsumerWorkPool] work_pool Thread pool for delivery processing, by default of size 1
def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
@connection = connection
+ @logger = connection.logger
@id = id || @connection.next_channel_id
@status = :opening
@connection.register_channel(self)
@@ -212,10 +213,11 @@
# {Bunny::Queue}, {Bunny::Exchange} and {Bunny::Consumer} instances.
# @api public
def close
@connection.close_channel(self)
closed!
+ maybe_kill_consumer_work_pool!
end
# @return [Boolean] true if this channel is open, false otherwise
# @api public
def open?
@@ -1279,11 +1281,11 @@
# @api public
def confirm_select(callback=nil)
raise_if_no_longer_open!
if @next_publish_seq_no == 0
- @confirms_continuations = ::Queue.new
+ @confirms_continuations = new_continuation
@unconfirmed_set = Set.new
@nacked_set = Set.new
@next_publish_seq_no = 1
end
@@ -1356,11 +1358,11 @@
# Recovers basic.qos setting, exchanges, queues and consumers. Used by the Automatic Network Failure
# Recovery feature.
#
# @api plugin
def recover_from_network_failure
- # puts "Recovering channel #{@id}"
+ @logger.debug "Recovering channel #{@id} after network failure"
release_all_continuations
recover_prefetch_setting
recover_exchanges
# this includes recovering bindings
@@ -1390,11 +1392,11 @@
# Recovery feature.
#
# @api plugin
def recover_queues
@queues.values.dup.each do |q|
- # puts "Recovering queue #{q.name}"
+ @logger.debug "Recovering queue #{q.name}"
q.recover_from_network_failure
end
end
# Recovers consumers. Used by the Automatic Network Failure
@@ -1434,11 +1436,11 @@
end
end
# @private
def handle_method(method)
- # puts "Channel#handle_frame on channel #{@id}: #{method.inspect}"
+ @logger.debug "Channel#handle_frame on channel #{@id}: #{method.inspect}"
case method
when AMQ::Protocol::Queue::DeclareOk then
@continuations.push(method)
when AMQ::Protocol::Queue::DeleteOk then
@continuations.push(method)
@@ -1523,23 +1525,22 @@
if consumer
@work_pool.submit do
consumer.call(DeliveryInfo.new(basic_deliver), MessageProperties.new(properties), content)
end
else
- # TODO: log it
- puts "[warning] No consumer for tag #{basic_deliver.consumer_tag}"
+ @logger.warn "No consumer for tag #{basic_deliver.consumer_tag} on channel #{@id}!"
end
end
# @private
def handle_basic_return(basic_return, properties, content)
x = find_exchange(basic_return.exchange)
if x
x.handle_return(ReturnInfo.new(basic_return), MessageProperties.new(properties), content)
else
- # TODO: log a warning
+ @logger.warn "Exchange #{basic_return.exchange} is not in channel #{@id}'s cache! Dropping returned message!"
end
end
# @private
def handle_ack_or_nack(delivery_tag, multiple, nack)
@@ -1572,16 +1573,17 @@
def wait_on_continuations
if @connection.threaded
t = Thread.current
@threads_waiting_on_continuations << t
- v = @continuations.pop
- @threads_waiting_on_continuations.delete(t)
-
- v
+ begin
+ @continuations.poll(@connection.continuation_timeout)
+ ensure
+ @threads_waiting_on_continuations.delete(t)
+ end
else
- connection.event_loop.run_once until @continuations.length > 0
+ connection.reader_loop.run_once until @continuations.length > 0
@continuations.pop
end
end
@@ -1589,14 +1591,15 @@
def wait_on_basic_get_continuations
if @connection.threaded
t = Thread.current
@threads_waiting_on_basic_get_continuations << t
- v = @basic_get_continuations.pop
- @threads_waiting_on_basic_get_continuations.delete(t)
-
- v
+ begin
+ @basic_get_continuations.poll(@connection.continuation_timeout)
+ ensure
+ @threads_waiting_on_basic_get_continuations.delete(t)
+ end
else
connection.event_loop.run_once until @basic_get_continuations.length > 0
@basic_get_continuations.pop
end
@@ -1606,42 +1609,35 @@
def wait_on_confirms_continuations
if @connection.threaded
t = Thread.current
@threads_waiting_on_confirms_continuations << t
- v = @confirms_continuations.pop
- @threads_waiting_on_confirms_continuations.delete(t)
-
- v
+ begin
+ @confirms_continuations.poll(@connection.continuation_timeout)
+ ensure
+ @threads_waiting_on_confirms_continuations.delete(t)
+ end
else
connection.event_loop.run_once until @confirms_continuations.length > 0
@confirms_continuations.pop
end
end
# Releases all continuations. Used by automatic network recovery.
# @private
def release_all_continuations
- if @confirms_continuations.num_waiting > 0
- @threads_waiting_on_confirms_continuations.each do |t|
- t.run
- end
+ @threads_waiting_on_confirms_continuations.each do |t|
+ t.run
end
- if @continuations.num_waiting > 0
- @threads_waiting_on_continuations.each do |t|
- t.run
- end
+ @threads_waiting_on_continuations.each do |t|
+ t.run
end
- if @basic_get_continuations.num_waiting > 0
- @threads_waiting_on_basic_get_continuations.each do |t|
- t.run
- end
+ @threads_waiting_on_basic_get_continuations.each do |t|
+ t.run
end
- @continuations = ::Queue.new
- @confirms_continuations = ::Queue.new
- @basic_get_continuations = ::Queue.new
+ self.reset_continuations
end
# Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads
# that won't do any real work for channels that do not register consumers (e.g. only used for
# publishing). MK.