lib/amqp/channel.rb in amqp-1.1.0.pre1 vs lib/amqp/channel.rb in amqp-1.1.0.pre2
- old
+ new
@@ -139,13 +139,25 @@
#
# AMQP gem supports several RabbitMQ extensions that extend Channel functionality.
# Learn more in {file:docs/VendorSpecificExtensions.textile}
#
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.2.5)
- class Channel < AMQ::Client::Channel
+ class Channel
#
+ # Behaviours
+ #
+
+ extend RegisterEntityMixin
+ include Entity
+ extend ProtocolMethodHandlers
+
+ register_entity :queue, AMQP::Queue
+ register_entity :exchange, AMQP::Exchange
+
+
+ #
# API
#
# AMQP connection this channel is part of
# @return [Connection]
@@ -154,11 +166,29 @@
# Status of this channel (one of: :opening, :closing, :open, :closed)
# @return [Symbol]
attr_reader :status
+ DEFAULT_REPLY_TEXT = "Goodbye".freeze
+ attr_reader :id
+
+ attr_reader :exchanges_awaiting_declare_ok, :exchanges_awaiting_delete_ok
+ attr_reader :queues_awaiting_declare_ok, :queues_awaiting_delete_ok, :queues_awaiting_bind_ok, :queues_awaiting_unbind_ok, :queues_awaiting_purge_ok, :queues_awaiting_get_response
+ attr_reader :consumers_awaiting_consume_ok, :consumers_awaiting_cancel_ok
+
+ attr_accessor :flow_is_active
+
+ # Change publisher index. Publisher index is incremented
+ # by 1 after each Basic.Publish starting at 1. This is done
+ # on both client and server, hence this acknowledged messages
+ # can be matched via its delivery-tag.
+ #
+ # @api private
+ attr_writer :publisher_index
+
+
# @param [AMQP::Session] connection Connection to open this channel on. If not given, default AMQP
# connection (accessible via {AMQP.connection}) will be used.
# @param [Integer] id Channel id. Must not be greater than max channel id client and broker
# negotiated on during connection setup. Almost always the right thing to do
# is to let AMQP gem pick channel identifier for you. If you want to get next
@@ -209,12 +239,37 @@
if id.kind_of?(Hash)
options = options.merge(id)
id = self.class.next_channel_id
end
- super(@connection, id, options)
+ super(@connection)
+ @id = id
+ @exchanges = Hash.new
+ @queues = Hash.new
+ @consumers = Hash.new
+ @options = { :auto_recovery => @connection.auto_recovering? }.merge(options)
+ @auto_recovery = (!!@options[:auto_recovery])
+
+ # we must synchronize frameset delivery. MK.
+ @mutex = Mutex.new
+
+ reset_state!
+
+ # 65536 is here for cases when channel is opened without passing a callback in,
+ # otherwise channel_mix would be nil and it causes a lot of needless headaches.
+ # lets just have this default. MK.
+ channel_max = if @connection.open?
+ @connection.channel_max || 65536
+ else
+ 65536
+ end
+
+ if channel_max != 0 && !(0..channel_max).include?(id)
+ raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{id}")
+ end
+
# we need this deferrable to mimic what AMQP gem 0.7 does to enable
# the following (pseudo-synchronous) style of programming some people use in their
# existing codebases:
#
# connection = AMQP.connect
@@ -222,11 +277,11 @@
# queue = AMQP::Queue.new(channel)
#
# ...
#
# Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK.
- @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
+ @channel_is_open_deferrable = AMQP::Deferrable.new
@parameter_checks = {:queue => [:durable, :exclusive, :auto_delete, :arguments], :exchange => [:type, :durable, :arguments]}
# only send channel.open when connection is actually open. Makes it possible to
# do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK.
@@ -261,11 +316,11 @@
# @api plugin
def auto_recover
return unless auto_recovering?
@channel_is_open_deferrable.fail
- @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
+ @channel_is_open_deferrable = AMQP::Deferrable.new
self.open do
@channel_is_open_deferrable.succeed
# re-establish prefetch
@@ -290,11 +345,11 @@
# with the same value. MK.
@id = self.class.next_channel_id
self.class.release_channel_id(old_id)
@channel_is_open_deferrable.fail
- @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
+ @channel_is_open_deferrable = AMQP::Deferrable.new
self.open do
@channel_is_open_deferrable.succeed
# re-establish prefetch
@@ -818,17 +873,17 @@
def queue!(name, opts = {}, &block)
queue = if block.nil?
Queue.new(self, name, opts)
else
shim = Proc.new { |q, method|
- if block.arity == 1
- block.call(q)
- else
- queue = find_queue(method.queue)
- block.call(queue, method.consumer_count, method.message_count)
- end
- }
+ if block.arity == 1
+ block.call(q)
+ else
+ queue = find_queue(method.queue)
+ block.call(queue, method.consumer_count, method.message_count)
+ end
+ }
Queue.new(self, name, opts, &shim)
end
register_queue(queue)
end
@@ -850,13 +905,19 @@
# Opens AMQP channel.
#
# @note Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss.
# @api public
def open(&block)
- super(&block)
+ @connection.send_frame(AMQ::Protocol::Channel::Open.encode(@id, AMQ::Protocol::EMPTY_STRING))
+ @connection.channels[@id] = self
+ self.status = :opening
+
+ self.redefine_callback :open, &block
end
+ alias reopen open
+
# @return [Boolean] true if channel is not closed.
# @api public
def open?
self.status == :opened || self.status == :opening
end # open?
@@ -887,13 +948,13 @@
# Closes AMQP channel.
#
# @api public
def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
self.status = :closing
- r = super(reply_code, reply_text, class_id, method_id, &block)
+ @connection.send_frame(AMQ::Protocol::Channel::Close.encode(@id, reply_code, reply_text, class_id, method_id))
- r
+ self.redefine_callback :close, &block
end
# @endgroup
@@ -910,11 +971,14 @@
# @param [Boolean] Desired flow state.
#
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.5.2.3.)
# @api public
def flow(active = false, &block)
- super(active, &block)
+ @connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active))
+
+ self.redefine_callback :flow, &block
+ self
end
# @return [Boolean] True if flow in this channel is active (messages will be delivered to consumers that use this channel).
#
# @api public
@@ -950,21 +1014,29 @@
# @api public
# @see #reject
# @see #recover
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
def acknowledge(delivery_tag, multiple = false)
- super(delivery_tag, multiple)
+ @connection.send_frame(AMQ::Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple))
+
+ self
end # acknowledge(delivery_tag, multiple = false)
# Reject a message with given delivery tag.
#
# @api public
# @see #acknowledge
# @see #recover
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
- def reject(delivery_tag, requeue = true)
- super(delivery_tag, requeue)
+ def reject(delivery_tag, requeue = true, multi = false)
+ if multi
+ @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(self.id, delivery_tag, multi, requeue))
+ else
+ @connection.send_frame(AMQ::Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue))
+ end
+
+ self
end # reject(delivery_tag, requeue = true)
# Notifies AMQ broker that consumer has recovered and unacknowledged messages need
# to be redelivered.
#
@@ -973,11 +1045,14 @@
# @note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.16.)
# @see #acknowledge
# @api public
def recover(requeue = true, &block)
- super(requeue, &block)
+ @connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue))
+
+ self.redefine_callback :recover, &block
+ self
end # recover(requeue = false, &block)
# @endgroup
@@ -988,25 +1063,34 @@
# Sets the channel to use standard transactions. One must use this method at least
# once on a channel before using #tx_tommit or tx_rollback methods.
#
# @api public
def tx_select(&block)
- super(&block)
+ @connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))
+
+ self.redefine_callback :tx_select, &block
+ self
end # tx_select(&block)
# Commits AMQP transaction.
#
# @api public
def tx_commit(&block)
- super(&block)
+ @connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id))
+
+ self.redefine_callback :tx_commit, &block
+ self
end # tx_commit(&block)
# Rolls AMQP transaction back.
#
# @api public
def tx_rollback(&block)
- super(&block)
+ @connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id))
+
+ self.redefine_callback :tx_rollback, &block
+ self
end # tx_rollback(&block)
# @endgroup
@@ -1019,108 +1103,79 @@
# Defines a callback that will be executed when channel is closed after
# channel-level exception.
#
# @api public
def on_error(&block)
- super(&block)
+ self.define_callback(:error, &block)
end
# @endgroup
# @group Publisher Confirms
def confirm_select(nowait = false, &block)
self.once_open do
- super(nowait, &block)
+ if nowait && block
+ raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense"
end
- end
- # @endgroup
+ @uses_publisher_confirmations = true
+ reset_publisher_index!
+ self.redefine_callback(:confirm_select, &block) unless nowait
+ self.redefine_callback(:after_publish) do
+ increment_publisher_index!
+ end
+ @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, nowait))
- #
- # Implementation
- #
-
-
- # Defines a global callback to be run on channel-level exception across
- # all channels. Consider using Channel#on_error instead. This method is here for sake
- # of backwards compatibility with 0.6.x and 0.7.x releases.
- #
- # @param [String] msg Error message that passed to previously defined handler
- #
- # @deprecated
- # @api public
- # @private
- def self.error(msg = nil, &block)
- if block
- @global_error_handler = block
- else
- @global_error_handler.call(msg) if @global_error_handler && msg
+ self
end
end
+ # @endgroup
- # Overrides AMQ::Client::Channel version to also call global callback
- # (if defined) for backwards compatibility.
- #
- # @private
- # @api private
- def handle_close(method)
- super(method)
- self.class.error(method.reply_text)
- self.class.release_channel_id(@id)
- end
-
- # Overrides AMQ::Client::Channel version to also release the channel id
#
- # @private
- # @api private
- def handle_close_ok(method)
- super(method)
- self.class.release_channel_id(@id)
- end
+ # Implementation
+ #
+
# Resets channel state (for example, list of registered queue objects and so on).
#
# Most of the time, this method is not
# called by application code.
#
# @private
# @api plugin
def reset(&block)
- # See AMQ::Client::Channel
+ # See AMQP::Channel
self.reset_state!
# there is no way to reset a deferrable; we have to use a new instance. MK.
- @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
+ @channel_is_open_deferrable = AMQP::Deferrable.new
@channel_is_open_deferrable.callback(&block)
@connection.on_connection do
@channel_is_open_deferrable.succeed
self.prefetch(@options[:prefetch], false) if @options[:prefetch]
end
end
- # @private
- # @api plugin
- def reset_state!
- super
- end # reset_state!
-
-
# Overrides superclass method to also re-create @channel_is_open_deferrable
#
# @api plugin
# @private
- def handle_connection_interruption(reason = nil)
- super(reason)
+ def handle_connection_interruption(method = nil)
+ @queues.each { |name, q| q.handle_connection_interruption(method) }
+ @exchanges.each { |name, e| e.handle_connection_interruption(method) }
+ self.exec_callback_yielding_self(:after_connection_interruption)
+ self.reset_state!
+
self.class.release_channel_id(@id) unless auto_recovering?
- @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
+ @channel_is_open_deferrable = AMQP::Deferrable.new
end
# @private
# @api private
@@ -1175,9 +1230,395 @@
# but it is possible that value either not yet available. MK.
max_channel = (1 << 16) - 1
@int_allocator ||= IntAllocator.new(1, max_channel)
end # self.initialize_channel_id_allocator
+
+ # @return [Boolean] true if this channel uses automatic recovery mode
+ def auto_recovering?
+ @auto_recovery
+ end # auto_recovering?
+
+
+ # @return [Hash<String, Consumer>]
+ def consumers
+ @consumers
+ end # consumers
+
+ # @return [Array<Queue>] Collection of queues that were declared on this channel.
+ def queues
+ @queues.values
+ end
+
+ # @return [Array<Exchange>] Collection of exchanges that were declared on this channel.
+ def exchanges
+ @exchanges.values
+ end
+
+
+ # AMQP connection this channel belongs to.
+ #
+ # @return [AMQP::Connection] Connection this channel belongs to.
+ def connection
+ @connection
+ end # connection
+
+ # Synchronizes given block using this channel's mutex.
+ # @api public
+ def synchronize(&block)
+ @mutex.synchronize(&block)
+ end
+
+
+
+ # @group QoS and flow handling
+
+ # Requests a specific quality of service. The QoS can be specified for the current channel
+ # or for all channels on the connection.
+ #
+ # @note RabbitMQ as of 2.3.1 does not support prefetch_size.
+ # @api public
+ def qos(prefetch_size = 0, prefetch_count = 32, global = false, &block)
+ @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, prefetch_size, prefetch_count, global))
+
+ self.redefine_callback :qos, &block
+ self
+ end # qos
+
+ # @endgroup
+
+
+
+ # @group Error handling
+
+
+ # Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
+ # Only one callback can be defined (the one defined last replaces previously added ones).
+ #
+ # @api public
+ def on_connection_interruption(&block)
+ self.redefine_callback(:after_connection_interruption, &block)
+ end # on_connection_interruption(&block)
+ alias after_connection_interruption on_connection_interruption
+
+
+ # Defines a callback that will be executed after TCP connection has recovered after a network failure
+ # but before AMQP connection is re-opened.
+ # Only one callback can be defined (the one defined last replaces previously added ones).
+ #
+ # @api public
+ def before_recovery(&block)
+ self.redefine_callback(:before_recovery, &block)
+ end # before_recovery(&block)
+
+ # @private
+ def run_before_recovery_callbacks
+ self.exec_callback_yielding_self(:before_recovery)
+
+ @queues.each { |name, q| q.run_before_recovery_callbacks }
+ @exchanges.each { |name, e| e.run_before_recovery_callbacks }
+ end
+
+
+
+ # Defines a callback that will be executed after AMQP connection has recovered after a network failure.
+ # Only one callback can be defined (the one defined last replaces previously added ones).
+ #
+ # @api public
+ def on_recovery(&block)
+ self.redefine_callback(:after_recovery, &block)
+ end # on_recovery(&block)
+ alias after_recovery on_recovery
+
+ # @private
+ def run_after_recovery_callbacks
+ self.exec_callback_yielding_self(:after_recovery)
+
+ @queues.each { |name, q| q.run_after_recovery_callbacks }
+ @exchanges.each { |name, e| e.run_after_recovery_callbacks }
+ end
+
+
+ # Called by associated connection object when AMQP connection has been re-established
+ # (for example, after a network failure).
+ #
+ # @api plugin
+ def auto_recover
+ return unless auto_recovering?
+
+ self.open do
+ # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
+ @exchanges.each { |name, e| e.auto_recover }
+ @queues.each { |name, q| q.auto_recover }
+ end
+ end # auto_recover
+
+ # @endgroup
+
+
+ # Publisher index is an index of the last message since
+ # the confirmations were activated, started with 0. It's
+ # incremented by 1 every time a message is published.
+ # This is done on both client and server, hence this
+ # acknowledged messages can be matched via its delivery-tag.
+ #
+ # @return [Integer] Current publisher index.
+ # @api public
+ def publisher_index
+ @publisher_index ||= 0
+ end
+
+ # Resets publisher index to 0
+ #
+ # @api plugin
+ def reset_publisher_index!
+ @publisher_index = 0
+ end
+
+
+ # This method is executed after publishing of each message via {Exchage#publish}.
+ # Currently it just increments publisher index by 1, so messages
+ # can be actually matched.
+ #
+ # @api plugin
+ def increment_publisher_index!
+ @publisher_index += 1
+ end
+
+ # @return [Boolean]
+ def uses_publisher_confirmations?
+ @uses_publisher_confirmations
+ end # uses_publisher_confirmations?
+
+
+ # Turn on confirmations for this channel and, if given,
+ # register callback for basic.ack from the broker.
+ #
+ # @raise [RuntimeError] Occurs when confirmations are already activated.
+ # @raise [RuntimeError] Occurs when nowait is true and block is given.
+ # @param [Boolean] nowait Whether we expect Confirm.Select-Ok to be returned by the broker or not.
+ #
+ # @yield [basick_ack] Callback which will be executed every time we receive Basic.Ack from the broker.
+ # @yieldparam [AMQ::Protocol::Basic::Ack] basick_ack Protocol method class instance.
+ #
+ # @return [self] self.
+ def on_ack(nowait = false, &block)
+ self.define_callback(:ack, &block) if block
+
+ self
+ end
+
+
+ # Register error callback for Basic.Nack. It's called
+ # when message(s) is rejected.
+ #
+ # @return [self] self
+ def on_nack(&block)
+ self.define_callback(:nack, &block) if block
+
+ self
+ end
+
+
+
+
+ # Handler for Confirm.Select-Ok. By default, it just
+ # executes hook specified via the #confirmations method
+ # with a single argument, a protocol method class
+ # instance (an instance of AMQ::Protocol::Confirm::SelectOk)
+ # and then it deletes the callback, since Confirm.Select
+ # is supposed to be sent just once.
+ #
+ # @api plugin
+ def handle_select_ok(method)
+ self.exec_callback_once(:confirm_select, method)
+ end
+
+ # Handler for Basic.Ack. By default, it just
+ # executes hook specified via the #confirm method
+ # with a single argument, a protocol method class
+ # instance (an instance of AMQ::Protocol::Basic::Ack).
+ #
+ # @api plugin
+ def handle_basic_ack(method)
+ self.exec_callback(:ack, method)
+ end
+
+
+ # Handler for Basic.Nack. By default, it just
+ # executes hook specified via the #confirm_failed method
+ # with a single argument, a protocol method class
+ # instance (an instance of AMQ::Protocol::Basic::Nack).
+ #
+ # @api plugin
+ def handle_basic_nack(method)
+ self.exec_callback(:nack, method)
+ end
+
+
+
+ #
+ # Implementation
+ #
+
+ def register_exchange(exchange)
+ raise ArgumentError, "argument is nil!" if exchange.nil?
+
+ @exchanges[exchange.name] = exchange
+ end # register_exchange(exchange)
+
+ # Finds exchange in the exchanges cache on this channel by name. Exchange only exists in the cache if
+ # it was previously instantiated on this channel.
+ #
+ # @param [String] name Exchange name
+ # @return [AMQP::Exchange] Exchange (if found)
+ # @api plugin
+ def find_exchange(name)
+ @exchanges[name]
+ end
+
+ # @api plugin
+ # @private
+ def register_queue(queue)
+ raise ArgumentError, "argument is nil!" if queue.nil?
+
+ @queues[queue.name] = queue
+ end # register_queue(queue)
+
+ # @api plugin
+ # @private
+ def find_queue(name)
+ @queues[name]
+ end
+
+
+ RECOVERY_EVENTS = [:after_connection_interruption, :before_recovery, :after_recovery].freeze
+
+
+ # @api plugin
+ # @private
+ def reset_state!
+ @flow_is_active = true
+
+ @queues_awaiting_declare_ok = Array.new
+ @exchanges_awaiting_declare_ok = Array.new
+
+ @queues_awaiting_delete_ok = Array.new
+
+ @exchanges_awaiting_delete_ok = Array.new
+ @queues_awaiting_purge_ok = Array.new
+ @queues_awaiting_bind_ok = Array.new
+ @queues_awaiting_unbind_ok = Array.new
+ @consumers_awaiting_consume_ok = Array.new
+ @consumers_awaiting_cancel_ok = Array.new
+
+ @queues_awaiting_get_response = Array.new
+
+ @callbacks = @callbacks.delete_if { |k, v| !RECOVERY_EVENTS.include?(k) }
+ @uses_publisher_confirmations = false
+ end # reset_state!
+
+
+ # @api plugin
+ # @private
+ def handle_open_ok(open_ok)
+ self.status = :opened
+ self.exec_callback_once_yielding_self(:open, open_ok)
+ end
+
+ # @api plugin
+ # @private
+ def handle_close_ok(close_ok)
+ self.status = :closed
+ self.connection.clear_frames_on(self.id)
+ self.exec_callback_once_yielding_self(:close, close_ok)
+
+ self.class.release_channel_id(@id)
+ end
+
+ # @api plugin
+ # @private
+ def handle_close(channel_close)
+ self.status = :closed
+ self.connection.clear_frames_on(self.id)
+
+ self.exec_callback_yielding_self(:error, channel_close)
+ end
+
+
+
+ self.handle(AMQ::Protocol::Channel::OpenOk) do |connection, frame|
+ channel = connection.channels[frame.channel]
+ channel.handle_open_ok(frame.decode_payload)
+ end
+
+ self.handle(AMQ::Protocol::Channel::CloseOk) do |connection, frame|
+ method = frame.decode_payload
+ channels = connection.channels
+
+ channel = channels[frame.channel]
+ channels.delete(channel)
+ channel.handle_close_ok(method)
+ end
+
+ self.handle(AMQ::Protocol::Channel::Close) do |connection, frame|
+ method = frame.decode_payload
+ channels = connection.channels
+ channel = channels[frame.channel]
+ connection.send_frame(AMQ::Protocol::Channel::CloseOk.encode(frame.channel))
+ channel.handle_close(method)
+ end
+
+ self.handle(AMQ::Protocol::Basic::QosOk) do |connection, frame|
+ channel = connection.channels[frame.channel]
+ channel.exec_callback(:qos, frame.decode_payload)
+ end
+
+ self.handle(AMQ::Protocol::Basic::RecoverOk) do |connection, frame|
+ channel = connection.channels[frame.channel]
+ channel.exec_callback(:recover, frame.decode_payload)
+ end
+
+ self.handle(AMQ::Protocol::Channel::FlowOk) do |connection, frame|
+ channel = connection.channels[frame.channel]
+ method = frame.decode_payload
+
+ channel.flow_is_active = method.active
+ channel.exec_callback(:flow, method)
+ end
+
+ self.handle(AMQ::Protocol::Tx::SelectOk) do |connection, frame|
+ channel = connection.channels[frame.channel]
+ channel.exec_callback(:tx_select, frame.decode_payload)
+ end
+
+ self.handle(AMQ::Protocol::Tx::CommitOk) do |connection, frame|
+ channel = connection.channels[frame.channel]
+ channel.exec_callback(:tx_commit, frame.decode_payload)
+ end
+
+ self.handle(AMQ::Protocol::Tx::RollbackOk) do |connection, frame|
+ channel = connection.channels[frame.channel]
+ channel.exec_callback(:tx_rollback, frame.decode_payload)
+ end
+
+ self.handle(AMQ::Protocol::Confirm::SelectOk) do |connection, frame|
+ method = frame.decode_payload
+ channel = connection.channels[frame.channel]
+ channel.handle_select_ok(method)
+ end
+
+ self.handle(AMQ::Protocol::Basic::Ack) do |connection, frame|
+ method = frame.decode_payload
+ channel = connection.channels[frame.channel]
+ channel.handle_basic_ack(method)
+ end
+
+ self.handle(AMQ::Protocol::Basic::Nack) do |connection, frame|
+ method = frame.decode_payload
+ channel = connection.channels[frame.channel]
+ channel.handle_basic_nack(method)
+ end
protected
@private
def validate_parameters_match!(entity, parameters, type)