lib/amq/client/channel.rb in amq-client-0.7.0.alpha34 vs lib/amq/client/channel.rb in amq-client-0.7.0.alpha35
- old
+ new
@@ -1,383 +1,11 @@
# encoding: utf-8
-require "amq/client/entity"
-require "amq/client/queue"
-require "amq/client/exchange"
+require "amq/client/async/channel"
module AMQ
module Client
- class Channel
-
- #
- # Behaviors
- #
-
- extend RegisterEntityMixin
- include Entity
- extend ProtocolMethodHandlers
-
- register_entity :queue, AMQ::Client::Queue
- register_entity :exchange, AMQ::Client::Exchange
-
- #
- # API
- #
-
-
- class ChannelOutOfBadError < StandardError # TODO: inherit from some AMQP error class defined in amq-protocol or use it straight away.
- def initialize(max, given)
- super("Channel max is #{max}, #{given} given.")
- end
- end
-
-
- DEFAULT_REPLY_TEXT = "Goodbye".freeze
-
- attr_reader :id
-
- attr_reader :exchanges_awaiting_declare_ok, :exchanges_awaiting_delete_ok
- attr_reader :queues_awaiting_declare_ok, :queues_awaiting_delete_ok, :queues_awaiting_bind_ok, :queues_awaiting_unbind_ok, :queues_awaiting_purge_ok, :queues_awaiting_consume_ok, :queues_awaiting_cancel_ok, :queues_awaiting_get_response
-
- attr_accessor :flow_is_active
-
-
- def initialize(connection, id)
- super(connection)
-
- @id = id
- @exchanges = Hash.new
- @queues = Hash.new
- @consumers = Hash.new
-
- reset_state!
-
- # 65536 is here for cases when channel is opened without passing a callback in,
- # otherwise channel_mix would be nil and it causes a lot of needless headaches.
- # lets just have this default. MK.
- channel_max = if @connection.open?
- @connection.channel_max || 65536
- else
- 65536
- end
-
- if channel_max != 0 && !(0..channel_max).include?(id)
- raise ChannelOutOfBadError.new(channel_max, id)
- end
- end
-
- def consumers
- @consumers
- end # consumers
-
- # @return [Array<Queue>] Collection of queues that were declared on this channel.
- def queues
- @queues.values
- end
-
- # @return [Array<Exchange>] Collection of exchanges that were declared on this channel.
- def exchanges
- @exchanges.values
- end
-
-
- # AMQP connection this channel belongs to.
- #
- # @return [AMQ::Client::Connection] Connection this channel belongs to.
- def connection
- @connection
- end # connection
-
-
- # @group Channel lifecycle
-
- # Opens AMQP channel.
- #
- # @api public
- def open(&block)
- @connection.send_frame(Protocol::Channel::Open.encode(@id, AMQ::Protocol::EMPTY_STRING))
- @connection.channels[@id] = self
- self.status = :opening
-
- self.redefine_callback :open, &block
- end
-
- # Closes AMQP channel.
- #
- # @api public
- def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
- @connection.send_frame(Protocol::Channel::Close.encode(@id, reply_code, reply_text, class_id, method_id))
-
- self.redefine_callback :close, &block
- end
-
- # @endgroup
-
-
-
- # @group Message acknowledgements
-
- # Acknowledge one or all messages on the channel.
- #
- # @api public
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
- def acknowledge(delivery_tag, multiple = false)
- @connection.send_frame(Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple))
-
- self
- end # acknowledge(delivery_tag, multiple = false)
-
- # Reject a message with given delivery tag.
- #
- # @api public
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
- def reject(delivery_tag, requeue = true)
- @connection.send_frame(Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue))
-
- self
- end # reject(delivery_tag, requeue = true)
-
- # Notifies AMQ broker that consumer has recovered and unacknowledged messages need
- # to be redelivered.
- #
- # @return [Channel] self
- #
- # @note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.16.)
- # @api public
- def recover(requeue = true, &block)
- @connection.send_frame(Protocol::Basic::Recover.encode(@id, requeue))
-
- self.redefine_callback :recover, &block
- self
- end # recover(requeue = false, &block)
-
- # @endgroup
-
-
-
- # @group QoS and flow handling
-
- # Requests a specific quality of service. The QoS can be specified for the current channel
- # or for all channels on the connection.
- #
- # @note RabbitMQ as of 2.3.1 does not support prefetch_size.
- # @api public
- def qos(prefetch_size = 0, prefetch_count = 32, global = false, &block)
- @connection.send_frame(Protocol::Basic::Qos.encode(@id, prefetch_size, prefetch_count, global))
-
- self.redefine_callback :qos, &block
- self
- end # qos(prefetch_size = 4096, prefetch_count = 32, global = false, &block)
-
- # Asks the peer to pause or restart the flow of content data sent to a consumer.
- # This is a simple flowcontrol mechanism that a peer can use to avoid overflowing its
- # queues or otherwise finding itself receiving more messages than it can process. Note that
- # this method is not intended for window control. It does not affect contents returned to
- # Queue#get callers.
- #
- # @param [Boolean] active Desired flow state.
- #
- # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.3.)
- # @api public
- def flow(active = false, &block)
- @connection.send_frame(Protocol::Channel::Flow.encode(@id, active))
-
- self.redefine_callback :flow, &block
- self
- end # flow(active = false, &block)
-
- # @return [Boolean] True if flow in this channel is active (messages will be delivered to consumers that use this channel).
- #
- # @api public
- def flow_is_active?
- @flow_is_active
- end # flow_is_active?
-
- # @endgroup
-
-
-
- # @group Transactions
-
- # Sets the channel to use standard transactions. One must use this method at least
- # once on a channel before using #tx_tommit or tx_rollback methods.
- #
- # @api public
- def tx_select(&block)
- @connection.send_frame(Protocol::Tx::Select.encode(@id))
-
- self.redefine_callback :tx_select, &block
- self
- end # tx_select(&block)
-
- # Commits AMQP transaction.
- #
- # @api public
- def tx_commit(&block)
- @connection.send_frame(Protocol::Tx::Commit.encode(@id))
-
- self.redefine_callback :tx_commit, &block
- self
- end # tx_commit(&block)
-
- # Rolls AMQP transaction back.
- #
- # @api public
- def tx_rollback(&block)
- @connection.send_frame(Protocol::Tx::Rollback.encode(@id))
-
- self.redefine_callback :tx_rollback, &block
- self
- end # tx_rollback(&block)
-
- # @endgroup
-
-
-
- # @group Error handling
-
- # Defines a callback that will be executed when channel is closed after
- # channel-level exception.
- #
- # @api public
- def on_error(&block)
- self.define_callback(:error, &block)
- end
-
- # @endgroup
-
-
- #
- # Implementation
- #
-
- def register_exchange(exchange)
- raise ArgumentError, "argument is nil!" if exchange.nil?
-
- @exchanges[exchange.name] = exchange
- end # register_exchange(exchange)
-
- # Finds exchange in the exchanges cache on this channel by name. Exchange only exists in the cache if
- # it was previously instantiated on this channel.
- #
- # @param [String] name Exchange name
- # @return [AMQ::Client::Exchange] Exchange (if found)
- # @api plugin
- def find_exchange(name)
- @exchanges[name]
- end
-
- def register_queue(queue)
- raise ArgumentError, "argument is nil!" if queue.nil?
-
- @queues[queue.name] = queue
- end # register_queue(queue)
-
- def find_queue(name)
- @queues[name]
- end
-
-
- def reset_state!
- @flow_is_active = true
-
- @queues_awaiting_declare_ok = Array.new
- @exchanges_awaiting_declare_ok = Array.new
-
- @queues_awaiting_delete_ok = Array.new
-
- @exchanges_awaiting_delete_ok = Array.new
- @queues_awaiting_purge_ok = Array.new
- @queues_awaiting_bind_ok = Array.new
- @queues_awaiting_unbind_ok = Array.new
- @queues_awaiting_consume_ok = Array.new
- @queues_awaiting_cancel_ok = Array.new
-
- @queues_awaiting_get_response = Array.new
-
- @callbacks = Hash.new
- end # reset_state!
-
-
- def handle_connection_interruption(method = nil)
- self.reset_state!
- end # handle_connection_interruption
-
-
-
- def handle_open_ok(open_ok)
- self.status = :opened
- self.exec_callback_once_yielding_self(:open, open_ok)
- end
-
- def handle_close_ok(close_ok)
- self.status = :closed
- self.exec_callback_once_yielding_self(:close, close_ok)
- end
-
- def handle_close(channel_close)
- self.status = :closed
- self.exec_callback_yielding_self(:error, channel_close)
-
- self.handle_connection_interruption(channel_close)
- end
-
-
-
- self.handle(Protocol::Channel::OpenOk) do |connection, frame|
- channel = connection.channels[frame.channel]
- channel.handle_open_ok(frame.decode_payload)
- end
-
- self.handle(Protocol::Channel::CloseOk) do |connection, frame|
- method = frame.decode_payload
- channels = connection.channels
-
- channel = channels[frame.channel]
- channels.delete(channel)
- channel.handle_close_ok(method)
- end
-
- self.handle(Protocol::Channel::Close) do |connection, frame|
- method = frame.decode_payload
- channels = connection.channels
- channel = channels[frame.channel]
-
- channel.handle_close(method)
- end
-
- self.handle(Protocol::Basic::QosOk) do |connection, frame|
- channel = connection.channels[frame.channel]
- channel.exec_callback(:qos, frame.decode_payload)
- end
-
- self.handle(Protocol::Basic::RecoverOk) do |connection, frame|
- channel = connection.channels[frame.channel]
- channel.exec_callback(:recover, frame.decode_payload)
- end
-
- self.handle(Protocol::Channel::FlowOk) do |connection, frame|
- channel = connection.channels[frame.channel]
- method = frame.decode_payload
-
- channel.flow_is_active = method.active
- channel.exec_callback(:flow, method)
- end
-
- self.handle(Protocol::Tx::SelectOk) do |connection, frame|
- channel = connection.channels[frame.channel]
- channel.exec_callback(:tx_select, frame.decode_payload)
- end
-
- self.handle(Protocol::Tx::CommitOk) do |connection, frame|
- channel = connection.channels[frame.channel]
- channel.exec_callback(:tx_commit, frame.decode_payload)
- end
-
- self.handle(Protocol::Tx::RollbackOk) do |connection, frame|
- channel = connection.channels[frame.channel]
- channel.exec_callback(:tx_rollback, frame.decode_payload)
- end
- end # Channel
+ # backwards compatibility
+ # @private
+ Channel = Async::Channel
end # Client
end # AMQ