lib/amqp/channel.rb in amqp-1.1.5 vs lib/amqp/channel.rb in amqp-1.1.6

- old
+ new

@@ -229,23 +229,23 @@ # @yieldparam [AMQP::Protocol::Channel::OpenOk] open_ok AMQP channel.open-ok) instance # # # @see AMQP::Channel#prefetch # @api public - def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block) + def initialize(connection = nil, id = nil, options = {}, &block) raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running? @connection = connection || AMQP.connection || AMQP.start # this means 2nd argument is options if id.kind_of?(Hash) options = options.merge(id) - id = self.class.next_channel_id + id = @connection.next_channel_id end super(@connection) - @id = id + @id = id || @connection.next_channel_id @exchanges = Hash.new @queues = Hash.new @consumers = Hash.new @options = { :auto_recovery => @connection.auto_recovering? }.merge(options) @auto_recovery = (!!@options[:auto_recovery]) @@ -262,12 +262,12 @@ @connection.channel_max || 65536 else 65536 end - if channel_max != 0 && !(0..channel_max).include?(id) - raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{id}") + if channel_max != 0 && !(0..channel_max).include?(@id) + raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{@id}") end # we need this deferrable to mimic what AMQP gem 0.7 does to enable # the following (pseudo-synchronous) style of programming some people use in their # existing codebases: @@ -342,11 +342,11 @@ def reuse old_id = @id # must release after we allocate a new id, otherwise we will end up # with the same value. MK. @id = self.class.next_channel_id - self.class.release_channel_id(old_id) + @connection.release_channel_id(old_id) @channel_is_open_deferrable.fail @channel_is_open_deferrable = AMQP::Deferrable.new self.open do @@ -1172,71 +1172,14 @@ @exchanges.each { |name, e| e.handle_connection_interruption(method) } self.exec_callback_yielding_self(:after_connection_interruption) self.reset_state! - self.class.release_channel_id(@id) unless auto_recovering? + @connection.release_channel_id(@id) unless auto_recovering? @channel_is_open_deferrable = AMQP::Deferrable.new end - - # @private - # @api private - def self.channel_id_mutex - @channel_id_mutex ||= Mutex.new - end - - # Returns next available channel id. This method is thread safe. - # - # @return [Fixnum] - # @api public - # @see Channel.release_channel_id - # @see Channel.reset_channel_id_allocator - def self.next_channel_id - channel_id_mutex.synchronize do - self.initialize_channel_id_allocator - - @int_allocator.allocate - end - end - - # Releases previously allocated channel id. This method is thread safe. - # - # @param [Fixnum] Channel id to release - # @api public - # @see Channel.next_channel_id - # @see Channel.reset_channel_id_allocator - def self.release_channel_id(i) - channel_id_mutex.synchronize do - self.initialize_channel_id_allocator - - @int_allocator.release(i) - end - end # self.release_channel_id(i) - - # Resets channel allocator. This method is thread safe. - # @api public - # @see Channel.next_channel_id - # @see Channel.release_channel_id - def self.reset_channel_id_allocator - channel_id_mutex.synchronize do - initialize_channel_id_allocator - - @int_allocator.reset - end - end # self.reset_channel_id_allocator - - - # @private - def self.initialize_channel_id_allocator - # TODO: ideally, this should be in agreement with agreed max number of channels of the connection, - # but it is possible that value either not yet available. MK. - max_channel = (1 << 16) - 1 - @int_allocator ||= IntAllocator.new(1, max_channel) - end # self.initialize_channel_id_allocator - - # @return [Boolean] true if this channel uses automatic recovery mode def auto_recovering? @auto_recovery end # auto_recovering? @@ -1532,10 +1475,10 @@ def handle_close_ok(close_ok) self.status = :closed self.connection.clear_frames_on(self.id) self.exec_callback_once_yielding_self(:close, close_ok) - self.class.release_channel_id(@id) + @connection.release_channel_id(@id) end # @api plugin # @private def handle_close(channel_close)