lib/amqp/channel.rb in amqp-0.8.0.rc13 vs lib/amqp/channel.rb in amqp-0.8.0.rc14

- old
+ new

@@ -1,7 +1,8 @@ # encoding: utf-8 +require "amqp/int_allocator" require "amqp/exchange" require "amqp/queue" module AMQP # h2. What are AMQP channels @@ -154,13 +155,10 @@ # Status of this channel (one of: :opening, :closing, :open, :closed) # @return [Symbol] attr_reader :status - # @note We encourage you to not rely on default AMQP connection and pass connection parameter - # explicitly. - # # @param [AMQP::Session] connection Connection to open this channel on. If not given, default AMQP # connection (accessible via {AMQP.connection}) will be used. # @param [Integer] id Channel id. Must not be greater than max channel id client and broker # negotiated on during connection setup. Almost always the right thing to do # is to let AMQP gem pick channel identifier for you. If you want to get next @@ -192,10 +190,11 @@ # end # end # # # @option options [Boolean] :prefetch (nil) Specifies number of messages to prefetch. Channel-specific. See {AMQP::Channel#prefetch}. + # @option options [Boolean] :auto_recovery (nil) Turns on automatic network failure recovery mode for this channel. # # @yield [channel, open_ok] Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional. # @yieldparam [Channel] channel Channel that is successfully open # @yieldparam [AMQP::Protocol::Channel::OpenOk] open_ok AMQP channel.open-ok) instance # @@ -203,18 +202,22 @@ # @see AMQP::Channel#prefetch # @api public def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block) raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running? - @options = options @connection = connection || AMQP.connection || AMQP.start + # this means 2nd argument is options + if id.kind_of?(Hash) + options = options.merge(id) + id = self.class.next_channel_id + end - super(@connection, id) + super(@connection, id, options) @rpcs = Hash.new # we need this deferrable to mimic what AMQP gem 0.7 does to enable - # the following (HIGHLY discouraged) style of programming some people use in their + # the following (pseudo-synchronous) style of programming some people use in their # existing codebases: # # connection = AMQP.connect # channel = AMQP::Channel.new(connection) # queue = AMQP::Queue.new(channel) @@ -240,12 +243,38 @@ self.prefetch(options[:prefetch], false) if options[:prefetch] end # self.open end # @connection.on_open end + # @return [Boolean] true if this channel is in automatic recovery mode + # @see #auto_recovering? + attr_accessor :auto_recovery + # @return [Boolean] true if this channel uses automatic recovery mode + def auto_recovering? + @auto_recovery + end # auto_recovering? + # Called by associated connection object when AMQP connection has been re-established + # (for example, after a network failure). + # + # @api plugin + def auto_recover + return unless auto_recovering? + + self.open do + @channel_is_open_deferrable.succeed + + # exchanges must be recovered first because queue recovery includes recovery of bindings. MK. + @exchanges.each { |name, e| e.auto_recover } + @queues.each { |name, q| q.auto_recover } + end + end # auto_recover + + + + # @group Declaring exchanges # Defines, intializes and returns a direct Exchange instance. # # Learn more about direct exchanges in {Exchange Exchange class documentation}. @@ -751,10 +780,12 @@ # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.4) # # @return [Queue] # @api public def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) + raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? + if name && !name.empty? && (queue = find_queue(name)) extended_opts = Queue.add_default_options(name, opts, block) validate_parameters_match!(queue, extended_opts) @@ -789,10 +820,17 @@ end register_queue(queue) end + # @return [Array<AMQP::Queue>] Queues cache for this channel + # @api plugin + # @private + def queues + @queues + end # queues + # @endgroup # Instantiates and returns an RPC instance associated with this channel. @@ -870,11 +908,14 @@ # Closes AMQP channel. # # @api public def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) - super(reply_code, reply_text, class_id, method_id, &block) + r = super(reply_code, reply_text, class_id, method_id, &block) + self.class.release_channel_id(@id) + + r end # @endgroup @@ -1088,32 +1129,71 @@ # Overrides superclass method to also re-create @channel_is_open_deferrable # # @api plugin # @private - def handle_connection_interruption(exception = nil) - super(exception) + def handle_connection_interruption(reason = nil) + super(reason) + + self.class.release_channel_id(@id) unless auto_recovering? @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new end # @private # @api private def self.channel_id_mutex @channel_id_mutex ||= Mutex.new end - # Returns incrementing channel id. This method is thread safe. + # Returns next available channel id. This method is thread safe. + # # @return [Fixnum] # @api public + # @see Channel.release_channel_id + # @see Channel.reset_channel_id_allocator def self.next_channel_id channel_id_mutex.synchronize do - @last_channel_id ||= 0 - @last_channel_id += 1 + self.initialize_channel_id_allocator - @last_channel_id + @int_allocator.allocate end end + + # Releases previously allocated channel id. This method is thread safe. + # + # @param [Fixnum] Channel id to release + # @api public + # @see Channel.next_channel_id + # @see Channel.reset_channel_id_allocator + def self.release_channel_id(i) + channel_id_mutex.synchronize do + self.initialize_channel_id_allocator + + @int_allocator.release(i) + end + end # self.release_channel_id(i) + + # Resets channel allocator. This method is thread safe. + # @api public + # @see Channel.next_channel_id + # @see Channel.release_channel_id + def self.reset_channel_id_allocator + channel_id_mutex.synchronize do + initialize_channel_id_allocator + + @int_allocator.reset + end + end # self.reset_channel_id_allocator + + + # @private + def self.initialize_channel_id_allocator + # TODO: ideally, this should be in agreement with agreed max number of channels of the connection, + # but it is possible that value either not yet available. MK. + max_channel = (1 << 16) - 1 + @int_allocator ||= IntAllocator.new(1, max_channel) + end # self.initialize_channel_id_allocator # @private # @api plugin def register_rpc(rpc) raise ArgumentError, "argument is nil!" unless rpc