lib/amqp/channel.rb in amqp-1.1.5 vs lib/amqp/channel.rb in amqp-1.1.6
- old
+ new
@@ -229,23 +229,23 @@
# @yieldparam [AMQP::Protocol::Channel::OpenOk] open_ok AMQP channel.open-ok) instance
#
#
# @see AMQP::Channel#prefetch
# @api public
- def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block)
+ def initialize(connection = nil, id = nil, options = {}, &block)
raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running?
@connection = connection || AMQP.connection || AMQP.start
# this means 2nd argument is options
if id.kind_of?(Hash)
options = options.merge(id)
- id = self.class.next_channel_id
+ id = @connection.next_channel_id
end
super(@connection)
- @id = id
+ @id = id || @connection.next_channel_id
@exchanges = Hash.new
@queues = Hash.new
@consumers = Hash.new
@options = { :auto_recovery => @connection.auto_recovering? }.merge(options)
@auto_recovery = (!!@options[:auto_recovery])
@@ -262,12 +262,12 @@
@connection.channel_max || 65536
else
65536
end
- if channel_max != 0 && !(0..channel_max).include?(id)
- raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{id}")
+ if channel_max != 0 && !(0..channel_max).include?(@id)
+ raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{@id}")
end
# we need this deferrable to mimic what AMQP gem 0.7 does to enable
# the following (pseudo-synchronous) style of programming some people use in their
# existing codebases:
@@ -342,11 +342,11 @@
def reuse
old_id = @id
# must release after we allocate a new id, otherwise we will end up
# with the same value. MK.
@id = self.class.next_channel_id
- self.class.release_channel_id(old_id)
+ @connection.release_channel_id(old_id)
@channel_is_open_deferrable.fail
@channel_is_open_deferrable = AMQP::Deferrable.new
self.open do
@@ -1172,71 +1172,14 @@
@exchanges.each { |name, e| e.handle_connection_interruption(method) }
self.exec_callback_yielding_self(:after_connection_interruption)
self.reset_state!
- self.class.release_channel_id(@id) unless auto_recovering?
+ @connection.release_channel_id(@id) unless auto_recovering?
@channel_is_open_deferrable = AMQP::Deferrable.new
end
-
- # @private
- # @api private
- def self.channel_id_mutex
- @channel_id_mutex ||= Mutex.new
- end
-
- # Returns next available channel id. This method is thread safe.
- #
- # @return [Fixnum]
- # @api public
- # @see Channel.release_channel_id
- # @see Channel.reset_channel_id_allocator
- def self.next_channel_id
- channel_id_mutex.synchronize do
- self.initialize_channel_id_allocator
-
- @int_allocator.allocate
- end
- end
-
- # Releases previously allocated channel id. This method is thread safe.
- #
- # @param [Fixnum] Channel id to release
- # @api public
- # @see Channel.next_channel_id
- # @see Channel.reset_channel_id_allocator
- def self.release_channel_id(i)
- channel_id_mutex.synchronize do
- self.initialize_channel_id_allocator
-
- @int_allocator.release(i)
- end
- end # self.release_channel_id(i)
-
- # Resets channel allocator. This method is thread safe.
- # @api public
- # @see Channel.next_channel_id
- # @see Channel.release_channel_id
- def self.reset_channel_id_allocator
- channel_id_mutex.synchronize do
- initialize_channel_id_allocator
-
- @int_allocator.reset
- end
- end # self.reset_channel_id_allocator
-
-
- # @private
- def self.initialize_channel_id_allocator
- # TODO: ideally, this should be in agreement with agreed max number of channels of the connection,
- # but it is possible that value either not yet available. MK.
- max_channel = (1 << 16) - 1
- @int_allocator ||= IntAllocator.new(1, max_channel)
- end # self.initialize_channel_id_allocator
-
-
# @return [Boolean] true if this channel uses automatic recovery mode
def auto_recovering?
@auto_recovery
end # auto_recovering?
@@ -1532,10 +1475,10 @@
def handle_close_ok(close_ok)
self.status = :closed
self.connection.clear_frames_on(self.id)
self.exec_callback_once_yielding_self(:close, close_ok)
- self.class.release_channel_id(@id)
+ @connection.release_channel_id(@id)
end
# @api plugin
# @private
def handle_close(channel_close)