lib/bunny/channel.rb in bunny-0.9.0.pre10 vs lib/bunny/channel.rb in bunny-0.9.0.pre11

- old
+ new

@@ -157,10 +157,11 @@ # @param [Bunny::Session] connection AMQP 0.9.1 connection # @param [Integer] id Channel id, pass nil to make Bunny automatically allocate it # @param [Bunny::ConsumerWorkPool] work_pool Thread pool for delivery processing, by default of size 1 def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1)) @connection = connection + @logger = connection.logger @id = id || @connection.next_channel_id @status = :opening @connection.register_channel(self) @@ -212,10 +213,11 @@ # {Bunny::Queue}, {Bunny::Exchange} and {Bunny::Consumer} instances. # @api public def close @connection.close_channel(self) closed! + maybe_kill_consumer_work_pool! end # @return [Boolean] true if this channel is open, false otherwise # @api public def open? @@ -1279,11 +1281,11 @@ # @api public def confirm_select(callback=nil) raise_if_no_longer_open! if @next_publish_seq_no == 0 - @confirms_continuations = ::Queue.new + @confirms_continuations = new_continuation @unconfirmed_set = Set.new @nacked_set = Set.new @next_publish_seq_no = 1 end @@ -1356,11 +1358,11 @@ # Recovers basic.qos setting, exchanges, queues and consumers. Used by the Automatic Network Failure # Recovery feature. # # @api plugin def recover_from_network_failure - # puts "Recovering channel #{@id}" + @logger.debug "Recovering channel #{@id} after network failure" release_all_continuations recover_prefetch_setting recover_exchanges # this includes recovering bindings @@ -1390,11 +1392,11 @@ # Recovery feature. # # @api plugin def recover_queues @queues.values.dup.each do |q| - # puts "Recovering queue #{q.name}" + @logger.debug "Recovering queue #{q.name}" q.recover_from_network_failure end end # Recovers consumers. Used by the Automatic Network Failure @@ -1434,11 +1436,11 @@ end end # @private def handle_method(method) - # puts "Channel#handle_frame on channel #{@id}: #{method.inspect}" + @logger.debug "Channel#handle_frame on channel #{@id}: #{method.inspect}" case method when AMQ::Protocol::Queue::DeclareOk then @continuations.push(method) when AMQ::Protocol::Queue::DeleteOk then @continuations.push(method) @@ -1523,23 +1525,22 @@ if consumer @work_pool.submit do consumer.call(DeliveryInfo.new(basic_deliver), MessageProperties.new(properties), content) end else - # TODO: log it - puts "[warning] No consumer for tag #{basic_deliver.consumer_tag}" + @logger.warn "No consumer for tag #{basic_deliver.consumer_tag} on channel #{@id}!" end end # @private def handle_basic_return(basic_return, properties, content) x = find_exchange(basic_return.exchange) if x x.handle_return(ReturnInfo.new(basic_return), MessageProperties.new(properties), content) else - # TODO: log a warning + @logger.warn "Exchange #{basic_return.exchange} is not in channel #{@id}'s cache! Dropping returned message!" end end # @private def handle_ack_or_nack(delivery_tag, multiple, nack) @@ -1572,16 +1573,17 @@ def wait_on_continuations if @connection.threaded t = Thread.current @threads_waiting_on_continuations << t - v = @continuations.pop - @threads_waiting_on_continuations.delete(t) - - v + begin + @continuations.poll(@connection.continuation_timeout) + ensure + @threads_waiting_on_continuations.delete(t) + end else - connection.event_loop.run_once until @continuations.length > 0 + connection.reader_loop.run_once until @continuations.length > 0 @continuations.pop end end @@ -1589,14 +1591,15 @@ def wait_on_basic_get_continuations if @connection.threaded t = Thread.current @threads_waiting_on_basic_get_continuations << t - v = @basic_get_continuations.pop - @threads_waiting_on_basic_get_continuations.delete(t) - - v + begin + @basic_get_continuations.poll(@connection.continuation_timeout) + ensure + @threads_waiting_on_basic_get_continuations.delete(t) + end else connection.event_loop.run_once until @basic_get_continuations.length > 0 @basic_get_continuations.pop end @@ -1606,42 +1609,35 @@ def wait_on_confirms_continuations if @connection.threaded t = Thread.current @threads_waiting_on_confirms_continuations << t - v = @confirms_continuations.pop - @threads_waiting_on_confirms_continuations.delete(t) - - v + begin + @confirms_continuations.poll(@connection.continuation_timeout) + ensure + @threads_waiting_on_confirms_continuations.delete(t) + end else connection.event_loop.run_once until @confirms_continuations.length > 0 @confirms_continuations.pop end end # Releases all continuations. Used by automatic network recovery. # @private def release_all_continuations - if @confirms_continuations.num_waiting > 0 - @threads_waiting_on_confirms_continuations.each do |t| - t.run - end + @threads_waiting_on_confirms_continuations.each do |t| + t.run end - if @continuations.num_waiting > 0 - @threads_waiting_on_continuations.each do |t| - t.run - end + @threads_waiting_on_continuations.each do |t| + t.run end - if @basic_get_continuations.num_waiting > 0 - @threads_waiting_on_basic_get_continuations.each do |t| - t.run - end + @threads_waiting_on_basic_get_continuations.each do |t| + t.run end - @continuations = ::Queue.new - @confirms_continuations = ::Queue.new - @basic_get_continuations = ::Queue.new + self.reset_continuations end # Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads # that won't do any real work for channels that do not register consumers (e.g. only used for # publishing). MK.