lib/amq/client/async/channel.rb in amq-client-0.7.0.alpha35 vs lib/amq/client/async/channel.rb in amq-client-0.8.0

- old
+ new

@@ -23,35 +23,34 @@ # # API # - class ChannelOutOfBadError < StandardError # TODO: inherit from some AMQP error class defined in amq-protocol or use it straight away. - def initialize(max, given) - super("Channel max is #{max}, #{given} given.") - end - end - - DEFAULT_REPLY_TEXT = "Goodbye".freeze attr_reader :id attr_reader :exchanges_awaiting_declare_ok, :exchanges_awaiting_delete_ok - attr_reader :queues_awaiting_declare_ok, :queues_awaiting_delete_ok, :queues_awaiting_bind_ok, :queues_awaiting_unbind_ok, :queues_awaiting_purge_ok, :queues_awaiting_consume_ok, :queues_awaiting_cancel_ok, :queues_awaiting_get_response + attr_reader :queues_awaiting_declare_ok, :queues_awaiting_delete_ok, :queues_awaiting_bind_ok, :queues_awaiting_unbind_ok, :queues_awaiting_purge_ok, :queues_awaiting_get_response + attr_reader :consumers_awaiting_consume_ok, :consumers_awaiting_cancel_ok attr_accessor :flow_is_active - def initialize(connection, id) + def initialize(connection, id, options = {}) super(connection) @id = id @exchanges = Hash.new @queues = Hash.new @consumers = Hash.new + @options = { :auto_recovery => connection.auto_recovering? }.merge(options) + @auto_recovery = (!!@options[:auto_recovery]) + # we must synchronize frameset delivery. MK. + @mutex = Mutex.new + reset_state! # 65536 is here for cases when channel is opened without passing a callback in, # otherwise channel_mix would be nil and it causes a lot of needless headaches. # lets just have this default. MK. @@ -60,14 +59,21 @@ else 65536 end if channel_max != 0 && !(0..channel_max).include?(id) - raise ChannelOutOfBadError.new(channel_max, id) + raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{id}") end end + # @return [Boolean] true if this channel uses automatic recovery mode + def auto_recovering? + @auto_recovery + end # auto_recovering? + + + # @return [Hash<String, Consumer>] def consumers @consumers end # consumers # @return [Array<Queue>] Collection of queues that were declared on this channel. @@ -86,11 +92,17 @@ # @return [AMQ::Client::Connection] Connection this channel belongs to. def connection @connection end # connection + # Synchronizes given block using this channel's mutex. + # @api public + def synchronize(&block) + @mutex.synchronize(&block) + end + # @group Channel lifecycle # Opens AMQP channel. # # @api public @@ -99,10 +111,11 @@ @connection.channels[@id] = self self.status = :opening self.redefine_callback :open, &block end + alias reopen open # Closes AMQP channel. # # @api public def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) @@ -243,10 +256,81 @@ # @api public def on_error(&block) self.define_callback(:error, &block) end + + # Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). + # Only one callback can be defined (the one defined last replaces previously added ones). + # + # @api public + def on_connection_interruption(&block) + self.redefine_callback(:after_connection_interruption, &block) + end # on_connection_interruption(&block) + alias after_connection_interruption on_connection_interruption + + # @private + def handle_connection_interruption(method = nil) + @queues.each { |name, q| q.handle_connection_interruption(method) } + @exchanges.each { |name, e| e.handle_connection_interruption(method) } + + self.exec_callback_yielding_self(:after_connection_interruption) + self.reset_state! + end # handle_connection_interruption + + + # Defines a callback that will be executed after TCP connection has recovered after a network failure + # but before AMQP connection is re-opened. + # Only one callback can be defined (the one defined last replaces previously added ones). + # + # @api public + def before_recovery(&block) + self.redefine_callback(:before_recovery, &block) + end # before_recovery(&block) + + # @private + def run_before_recovery_callbacks + self.exec_callback_yielding_self(:before_recovery) + + @queues.each { |name, q| q.run_before_recovery_callbacks } + @exchanges.each { |name, e| e.run_before_recovery_callbacks } + end + + + + # Defines a callback that will be executed after AMQP connection has recovered after a network failure. + # Only one callback can be defined (the one defined last replaces previously added ones). + # + # @api public + def on_recovery(&block) + self.redefine_callback(:after_recovery, &block) + end # on_recovery(&block) + alias after_recovery on_recovery + + # @private + def run_after_recovery_callbacks + self.exec_callback_yielding_self(:after_recovery) + + @queues.each { |name, q| q.run_after_recovery_callbacks } + @exchanges.each { |name, e| e.run_after_recovery_callbacks } + end + + + # Called by associated connection object when AMQP connection has been re-established + # (for example, after a network failure). + # + # @api plugin + def auto_recover + return unless auto_recovering? + + self.open do + # exchanges must be recovered first because queue recovery includes recovery of bindings. MK. + @exchanges.each { |name, e| e.auto_recover } + @queues.each { |name, q| q.auto_recover } + end + end # auto_recover + # @endgroup # # Implementation @@ -266,21 +350,30 @@ # @api plugin def find_exchange(name) @exchanges[name] end + # @api plugin + # @private def register_queue(queue) raise ArgumentError, "argument is nil!" if queue.nil? @queues[queue.name] = queue end # register_queue(queue) + # @api plugin + # @private def find_queue(name) @queues[name] end + RECOVERY_EVENTS = [:after_connection_interruption, :before_recovery, :after_recovery].freeze + + + # @api plugin + # @private def reset_state! @flow_is_active = true @queues_awaiting_declare_ok = Array.new @exchanges_awaiting_declare_ok = Array.new @@ -289,35 +382,35 @@ @exchanges_awaiting_delete_ok = Array.new @queues_awaiting_purge_ok = Array.new @queues_awaiting_bind_ok = Array.new @queues_awaiting_unbind_ok = Array.new - @queues_awaiting_consume_ok = Array.new - @queues_awaiting_cancel_ok = Array.new + @consumers_awaiting_consume_ok = Array.new + @consumers_awaiting_cancel_ok = Array.new @queues_awaiting_get_response = Array.new - @callbacks = Hash.new + @callbacks = @callbacks.delete_if { |k, v| !RECOVERY_EVENTS.include?(k) } end # reset_state! - def handle_connection_interruption(method = nil) - self.reset_state! - end # handle_connection_interruption - - - + # @api plugin + # @private def handle_open_ok(open_ok) self.status = :opened self.exec_callback_once_yielding_self(:open, open_ok) end + # @api plugin + # @private def handle_close_ok(close_ok) self.status = :closed self.exec_callback_once_yielding_self(:close, close_ok) end + # @api plugin + # @private def handle_close(channel_close) self.status = :closed self.exec_callback_yielding_self(:error, channel_close) self.handle_connection_interruption(channel_close) @@ -377,9 +470,9 @@ self.handle(Protocol::Tx::RollbackOk) do |connection, frame| channel = connection.channels[frame.channel] channel.exec_callback(:tx_rollback, frame.decode_payload) end - end # Channel + end # Channel end # Async end # Client end # AMQ