lib/amq/client/async/channel.rb in amq-client-0.7.0.alpha35 vs lib/amq/client/async/channel.rb in amq-client-0.8.0
- old
+ new
@@ -23,35 +23,34 @@
#
# API
#
- class ChannelOutOfBadError < StandardError # TODO: inherit from some AMQP error class defined in amq-protocol or use it straight away.
- def initialize(max, given)
- super("Channel max is #{max}, #{given} given.")
- end
- end
-
-
DEFAULT_REPLY_TEXT = "Goodbye".freeze
attr_reader :id
attr_reader :exchanges_awaiting_declare_ok, :exchanges_awaiting_delete_ok
- attr_reader :queues_awaiting_declare_ok, :queues_awaiting_delete_ok, :queues_awaiting_bind_ok, :queues_awaiting_unbind_ok, :queues_awaiting_purge_ok, :queues_awaiting_consume_ok, :queues_awaiting_cancel_ok, :queues_awaiting_get_response
+ attr_reader :queues_awaiting_declare_ok, :queues_awaiting_delete_ok, :queues_awaiting_bind_ok, :queues_awaiting_unbind_ok, :queues_awaiting_purge_ok, :queues_awaiting_get_response
+ attr_reader :consumers_awaiting_consume_ok, :consumers_awaiting_cancel_ok
attr_accessor :flow_is_active
- def initialize(connection, id)
+ def initialize(connection, id, options = {})
super(connection)
@id = id
@exchanges = Hash.new
@queues = Hash.new
@consumers = Hash.new
+ @options = { :auto_recovery => connection.auto_recovering? }.merge(options)
+ @auto_recovery = (!!@options[:auto_recovery])
+ # we must synchronize frameset delivery. MK.
+ @mutex = Mutex.new
+
reset_state!
# 65536 is here for cases when channel is opened without passing a callback in,
# otherwise channel_mix would be nil and it causes a lot of needless headaches.
# lets just have this default. MK.
@@ -60,14 +59,21 @@
else
65536
end
if channel_max != 0 && !(0..channel_max).include?(id)
- raise ChannelOutOfBadError.new(channel_max, id)
+ raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{id}")
end
end
+ # @return [Boolean] true if this channel uses automatic recovery mode
+ def auto_recovering?
+ @auto_recovery
+ end # auto_recovering?
+
+
+ # @return [Hash<String, Consumer>]
def consumers
@consumers
end # consumers
# @return [Array<Queue>] Collection of queues that were declared on this channel.
@@ -86,11 +92,17 @@
# @return [AMQ::Client::Connection] Connection this channel belongs to.
def connection
@connection
end # connection
+ # Synchronizes given block using this channel's mutex.
+ # @api public
+ def synchronize(&block)
+ @mutex.synchronize(&block)
+ end
+
# @group Channel lifecycle
# Opens AMQP channel.
#
# @api public
@@ -99,10 +111,11 @@
@connection.channels[@id] = self
self.status = :opening
self.redefine_callback :open, &block
end
+ alias reopen open
# Closes AMQP channel.
#
# @api public
def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
@@ -243,10 +256,81 @@
# @api public
def on_error(&block)
self.define_callback(:error, &block)
end
+
+ # Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
+ # Only one callback can be defined (the one defined last replaces previously added ones).
+ #
+ # @api public
+ def on_connection_interruption(&block)
+ self.redefine_callback(:after_connection_interruption, &block)
+ end # on_connection_interruption(&block)
+ alias after_connection_interruption on_connection_interruption
+
+ # @private
+ def handle_connection_interruption(method = nil)
+ @queues.each { |name, q| q.handle_connection_interruption(method) }
+ @exchanges.each { |name, e| e.handle_connection_interruption(method) }
+
+ self.exec_callback_yielding_self(:after_connection_interruption)
+ self.reset_state!
+ end # handle_connection_interruption
+
+
+ # Defines a callback that will be executed after TCP connection has recovered after a network failure
+ # but before AMQP connection is re-opened.
+ # Only one callback can be defined (the one defined last replaces previously added ones).
+ #
+ # @api public
+ def before_recovery(&block)
+ self.redefine_callback(:before_recovery, &block)
+ end # before_recovery(&block)
+
+ # @private
+ def run_before_recovery_callbacks
+ self.exec_callback_yielding_self(:before_recovery)
+
+ @queues.each { |name, q| q.run_before_recovery_callbacks }
+ @exchanges.each { |name, e| e.run_before_recovery_callbacks }
+ end
+
+
+
+ # Defines a callback that will be executed after AMQP connection has recovered after a network failure.
+ # Only one callback can be defined (the one defined last replaces previously added ones).
+ #
+ # @api public
+ def on_recovery(&block)
+ self.redefine_callback(:after_recovery, &block)
+ end # on_recovery(&block)
+ alias after_recovery on_recovery
+
+ # @private
+ def run_after_recovery_callbacks
+ self.exec_callback_yielding_self(:after_recovery)
+
+ @queues.each { |name, q| q.run_after_recovery_callbacks }
+ @exchanges.each { |name, e| e.run_after_recovery_callbacks }
+ end
+
+
+ # Called by associated connection object when AMQP connection has been re-established
+ # (for example, after a network failure).
+ #
+ # @api plugin
+ def auto_recover
+ return unless auto_recovering?
+
+ self.open do
+ # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
+ @exchanges.each { |name, e| e.auto_recover }
+ @queues.each { |name, q| q.auto_recover }
+ end
+ end # auto_recover
+
# @endgroup
#
# Implementation
@@ -266,21 +350,30 @@
# @api plugin
def find_exchange(name)
@exchanges[name]
end
+ # @api plugin
+ # @private
def register_queue(queue)
raise ArgumentError, "argument is nil!" if queue.nil?
@queues[queue.name] = queue
end # register_queue(queue)
+ # @api plugin
+ # @private
def find_queue(name)
@queues[name]
end
+ RECOVERY_EVENTS = [:after_connection_interruption, :before_recovery, :after_recovery].freeze
+
+
+ # @api plugin
+ # @private
def reset_state!
@flow_is_active = true
@queues_awaiting_declare_ok = Array.new
@exchanges_awaiting_declare_ok = Array.new
@@ -289,35 +382,35 @@
@exchanges_awaiting_delete_ok = Array.new
@queues_awaiting_purge_ok = Array.new
@queues_awaiting_bind_ok = Array.new
@queues_awaiting_unbind_ok = Array.new
- @queues_awaiting_consume_ok = Array.new
- @queues_awaiting_cancel_ok = Array.new
+ @consumers_awaiting_consume_ok = Array.new
+ @consumers_awaiting_cancel_ok = Array.new
@queues_awaiting_get_response = Array.new
- @callbacks = Hash.new
+ @callbacks = @callbacks.delete_if { |k, v| !RECOVERY_EVENTS.include?(k) }
end # reset_state!
- def handle_connection_interruption(method = nil)
- self.reset_state!
- end # handle_connection_interruption
-
-
-
+ # @api plugin
+ # @private
def handle_open_ok(open_ok)
self.status = :opened
self.exec_callback_once_yielding_self(:open, open_ok)
end
+ # @api plugin
+ # @private
def handle_close_ok(close_ok)
self.status = :closed
self.exec_callback_once_yielding_self(:close, close_ok)
end
+ # @api plugin
+ # @private
def handle_close(channel_close)
self.status = :closed
self.exec_callback_yielding_self(:error, channel_close)
self.handle_connection_interruption(channel_close)
@@ -377,9 +470,9 @@
self.handle(Protocol::Tx::RollbackOk) do |connection, frame|
channel = connection.channels[frame.channel]
channel.exec_callback(:tx_rollback, frame.decode_payload)
end
- end # Channel
+ end # Channel
end # Async
end # Client
end # AMQ