lib/bunny/channel.rb in bunny-0.9.0.pre5 vs lib/bunny/channel.rb in bunny-0.9.0.pre6

- old
+ new

@@ -16,11 +16,11 @@ # # API # attr_accessor :id, :connection, :status, :work_pool - attr_reader :next_publish_seq_no, :queues, :exchanges + attr_reader :next_publish_seq_no, :queues, :exchanges, :unconfirmed_set, :consumers def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1)) @connection = connection @id = id || @connection.next_channel_id @@ -213,10 +213,12 @@ Bunny::Timer.timeout(1, ClientTimeout) do @last_basic_qos_ok = @continuations.pop end raise_if_continuation_resulted_in_a_channel_error! + @prefetch_count = prefetch_count + @last_basic_qos_ok end def basic_recover(requeue) raise_if_no_longer_open! @@ -570,19 +572,21 @@ @last_tx_rollback_ok end # confirm.* - def confirm_select + def confirm_select(callback=nil) raise_if_no_longer_open! if @next_publish_seq_no == 0 - @confirms_continuations = [] + @confirms_continuations = ::Queue.new @unconfirmed_set = Set.new @next_publish_seq_no = 1 end + @confirms_callback = callback + @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false)) Bunny::Timer.timeout(1, ClientTimeout) do @last_confirm_select_ok = @continuations.pop end raise_if_continuation_resulted_in_a_channel_error! @@ -596,10 +600,51 @@ @only_acks_received end # + # Recovery + # + + def recover_from_network_failure + # puts "Recovering channel #{@id} from network failure..." + recover_prefetch_setting + recover_exchanges + # this includes recovering bindings + recover_queues + recover_consumers + end + + def recover_prefetch_setting + basic_qos(@prefetch_count) if @prefetch_count + end + + def recover_exchanges + @exchanges.values.dup.each do |x| + x.recover_from_network_failure + end + end + + def recover_queues + @queues.values.dup.each do |q| + q.recover_from_network_failure + end + end + + def recover_consumers + unless @consumers.empty? + @work_pool = ConsumerWorkPool.new(@work_pool.size) + @work_pool.start + end + @consumers.values.dup.each do |c| + c.recover_from_network_failure + end + end + + + + # # Implementation # def register_consumer(consumer_tag, consumer) @consumer_mutex.synchronize do @@ -658,14 +703,12 @@ when AMQ::Protocol::Tx::SelectOk then @continuations.push(method) when AMQ::Protocol::Confirm::SelectOk then @continuations.push(method) when AMQ::Protocol::Basic::Ack then - # TODO: implement confirm listeners handle_ack_or_nack(method.delivery_tag, method.multiple, false) when AMQ::Protocol::Basic::Nack then - # TODO: implement confirm listeners handle_ack_or_nack(method.delivery_tag, method.multiple, true) when AMQ::Protocol::Channel::Close then # puts "Exception on channel #{@id}: #{method.reply_code} #{method.reply_text}" closed! @connection.send_frame(AMQ::Protocol::Channel::CloseOk.encode(@id)) @@ -709,50 +752,64 @@ end end def handle_ack_or_nack(delivery_tag, multiple, nack) if multiple - @unconfirmed_set.delete_if { |i| i < delivery_tag } + @unconfirmed_set.delete_if { |i| i <= delivery_tag } else @unconfirmed_set.delete(delivery_tag) end @unconfirmed_set_mutex.synchronize do @only_acks_received = (@only_acks_received && !nack) @confirms_continuations.push(true) if @unconfirmed_set.empty? + + @confirms_callback.call(delivery_tag, multiple, nack) if @confirms_callback end end # Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads # that won't do any real work for channels that do not register consumers (e.g. only used for # publishing). MK. def maybe_start_consumer_work_pool! @work_pool.start unless @work_pool.started? end + def maybe_pause_consumer_work_pool! + @work_pool.pause if @work_pool && @work_pool.started? + end + + def maybe_kill_consumer_work_pool! + @work_pool.kill if @work_pool && @work_pool.started? + end + def read_next_frame(options = {}) @connection.read_next_frame(options = {}) end # Synchronizes given block using this channel's mutex. # @api public def synchronize(&block) @publishing_mutex.synchronize(&block) end - + def deregister_queue(queue) @queues.delete(queue.name) end + def deregister_queue_named(name) + @queues.delete(name) + end + def register_queue(queue) @queues[queue.name] = queue end def find_queue(name) @queues[name] end - + def deregister_exchange(exchange) @exchanges.delete(exchange.name) end def register_exchange(exchange)