lib/amqp/channel.rb in amqp-0.8.0.rc13 vs lib/amqp/channel.rb in amqp-0.8.0.rc14
- old
+ new
@@ -1,7 +1,8 @@
# encoding: utf-8
+require "amqp/int_allocator"
require "amqp/exchange"
require "amqp/queue"
module AMQP
# h2. What are AMQP channels
@@ -154,13 +155,10 @@
# Status of this channel (one of: :opening, :closing, :open, :closed)
# @return [Symbol]
attr_reader :status
- # @note We encourage you to not rely on default AMQP connection and pass connection parameter
- # explicitly.
- #
# @param [AMQP::Session] connection Connection to open this channel on. If not given, default AMQP
# connection (accessible via {AMQP.connection}) will be used.
# @param [Integer] id Channel id. Must not be greater than max channel id client and broker
# negotiated on during connection setup. Almost always the right thing to do
# is to let AMQP gem pick channel identifier for you. If you want to get next
@@ -192,10 +190,11 @@
# end
# end
#
#
# @option options [Boolean] :prefetch (nil) Specifies number of messages to prefetch. Channel-specific. See {AMQP::Channel#prefetch}.
+ # @option options [Boolean] :auto_recovery (nil) Turns on automatic network failure recovery mode for this channel.
#
# @yield [channel, open_ok] Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional.
# @yieldparam [Channel] channel Channel that is successfully open
# @yieldparam [AMQP::Protocol::Channel::OpenOk] open_ok AMQP channel.open-ok) instance
#
@@ -203,18 +202,22 @@
# @see AMQP::Channel#prefetch
# @api public
def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block)
raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running?
- @options = options
@connection = connection || AMQP.connection || AMQP.start
+ # this means 2nd argument is options
+ if id.kind_of?(Hash)
+ options = options.merge(id)
+ id = self.class.next_channel_id
+ end
- super(@connection, id)
+ super(@connection, id, options)
@rpcs = Hash.new
# we need this deferrable to mimic what AMQP gem 0.7 does to enable
- # the following (HIGHLY discouraged) style of programming some people use in their
+ # the following (pseudo-synchronous) style of programming some people use in their
# existing codebases:
#
# connection = AMQP.connect
# channel = AMQP::Channel.new(connection)
# queue = AMQP::Queue.new(channel)
@@ -240,12 +243,38 @@
self.prefetch(options[:prefetch], false) if options[:prefetch]
end # self.open
end # @connection.on_open
end
+ # @return [Boolean] true if this channel is in automatic recovery mode
+ # @see #auto_recovering?
+ attr_accessor :auto_recovery
+ # @return [Boolean] true if this channel uses automatic recovery mode
+ def auto_recovering?
+ @auto_recovery
+ end # auto_recovering?
+ # Called by associated connection object when AMQP connection has been re-established
+ # (for example, after a network failure).
+ #
+ # @api plugin
+ def auto_recover
+ return unless auto_recovering?
+
+ self.open do
+ @channel_is_open_deferrable.succeed
+
+ # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
+ @exchanges.each { |name, e| e.auto_recover }
+ @queues.each { |name, q| q.auto_recover }
+ end
+ end # auto_recover
+
+
+
+
# @group Declaring exchanges
# Defines, intializes and returns a direct Exchange instance.
#
# Learn more about direct exchanges in {Exchange Exchange class documentation}.
@@ -751,10 +780,12 @@
# @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.4)
#
# @return [Queue]
# @api public
def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block)
+ raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?
+
if name && !name.empty? && (queue = find_queue(name))
extended_opts = Queue.add_default_options(name, opts, block)
validate_parameters_match!(queue, extended_opts)
@@ -789,10 +820,17 @@
end
register_queue(queue)
end
+ # @return [Array<AMQP::Queue>] Queues cache for this channel
+ # @api plugin
+ # @private
+ def queues
+ @queues
+ end # queues
+
# @endgroup
# Instantiates and returns an RPC instance associated with this channel.
@@ -870,11 +908,14 @@
# Closes AMQP channel.
#
# @api public
def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
- super(reply_code, reply_text, class_id, method_id, &block)
+ r = super(reply_code, reply_text, class_id, method_id, &block)
+ self.class.release_channel_id(@id)
+
+ r
end
# @endgroup
@@ -1088,32 +1129,71 @@
# Overrides superclass method to also re-create @channel_is_open_deferrable
#
# @api plugin
# @private
- def handle_connection_interruption(exception = nil)
- super(exception)
+ def handle_connection_interruption(reason = nil)
+ super(reason)
+
+ self.class.release_channel_id(@id) unless auto_recovering?
@channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
end
# @private
# @api private
def self.channel_id_mutex
@channel_id_mutex ||= Mutex.new
end
- # Returns incrementing channel id. This method is thread safe.
+ # Returns next available channel id. This method is thread safe.
+ #
# @return [Fixnum]
# @api public
+ # @see Channel.release_channel_id
+ # @see Channel.reset_channel_id_allocator
def self.next_channel_id
channel_id_mutex.synchronize do
- @last_channel_id ||= 0
- @last_channel_id += 1
+ self.initialize_channel_id_allocator
- @last_channel_id
+ @int_allocator.allocate
end
end
+
+ # Releases previously allocated channel id. This method is thread safe.
+ #
+ # @param [Fixnum] Channel id to release
+ # @api public
+ # @see Channel.next_channel_id
+ # @see Channel.reset_channel_id_allocator
+ def self.release_channel_id(i)
+ channel_id_mutex.synchronize do
+ self.initialize_channel_id_allocator
+
+ @int_allocator.release(i)
+ end
+ end # self.release_channel_id(i)
+
+ # Resets channel allocator. This method is thread safe.
+ # @api public
+ # @see Channel.next_channel_id
+ # @see Channel.release_channel_id
+ def self.reset_channel_id_allocator
+ channel_id_mutex.synchronize do
+ initialize_channel_id_allocator
+
+ @int_allocator.reset
+ end
+ end # self.reset_channel_id_allocator
+
+
+ # @private
+ def self.initialize_channel_id_allocator
+ # TODO: ideally, this should be in agreement with agreed max number of channels of the connection,
+ # but it is possible that value either not yet available. MK.
+ max_channel = (1 << 16) - 1
+ @int_allocator ||= IntAllocator.new(1, max_channel)
+ end # self.initialize_channel_id_allocator
# @private
# @api plugin
def register_rpc(rpc)
raise ArgumentError, "argument is nil!" unless rpc