lib/amqp/channel.rb in amqp-0.8.0.rc2 vs lib/amqp/channel.rb in amqp-0.8.0.rc3
- old
+ new
@@ -2,26 +2,58 @@
require "amqp/exchange"
require "amqp/queue"
module AMQP
- # To quote {AMQP 0.9.1 specification http://bit.ly/hw2ELX}:
+ # h2. What are AMQP channels
#
+ # To quote {http://bit.ly/hw2ELX AMQP 0.9.1 specification}:
+ #
# AMQP is a multi-channelled protocol. Channels provide a way to multiplex
# a heavyweight TCP/IP connection into several light weight connections.
# This makes the protocol more “firewall friendly” since port usage is predictable.
# It also means that traffic shaping and other network QoS features can be easily employed.
# Channels are independent of each other and can perform different functions simultaneously
# with other channels, the available bandwidth being shared between the concurrent activities.
#
+ # h2. Opening a channel
#
- # h2. RabbitMQ extensions.
+ # *Channels are opened asynchronously*. There are two ways to do it: using a callback or pseudo-synchronous mode.
#
- # AMQP gem supports several RabbitMQ extensions taht extend Channel functionality.
- # Learn more in {file:docs/VendorSpecificExtensions.textile}
+ # @example Opening a channel with a callback
+ # # this assumes EventMachine reactor is running
+ # AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
+ # AMQP::Channel.new(client) do |channel, open_ok|
+ # # when this block is executed, channel is open and ready for use
+ # end
+ # end
#
+ # <script src="https://gist.github.com/939480.js?file=gistfile1.rb"></script>
#
+ # Unless your application needs multiple channels, this approach is recommended. Alternatively,
+ # AMQP::Channel can be instantiated without a block. Then returned channel is not immediately open,
+ # however, it can be used as if it was a synchronous, blocking method:
+ #
+ # @example Instantiating a channel that will be open eventually
+ # # this assumes EventMachine reactor is running
+ # AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
+ # channel = AMQP::Channel.new(client)
+ # exchange = channel.default_exchange
+ #
+ # # ...
+ # end
+ #
+ # <script src="https://gist.github.com/939482.js?file=gistfile1.rb"></script>
+ #
+ # Even though in the example above channel isn't immediately open, it is safe to declare exchanges using
+ # it. Exchange declaration will be delayed until after channel is open. Same applies to queue declaration
+ # and other operations on exchanges and queues. Library methods that rely on channel being open will be
+ # enqueued and executed in a FIFO manner when broker confirms channel opening.
+ # Note, however, that *this "pseudo-synchronous mode" is easy to abuse and introduce race conditions AMQP gem
+ # cannot resolve for you*. AMQP is an inherently asynchronous protocol and AMQP gem embraces this fact.
+ #
+ #
# h2. Key methods
#
# Key methods of Channel class are
#
# * {Channel#queue}
@@ -29,22 +61,86 @@
# * {Channel#direct}
# * {Channel#fanout}
# * {Channel#topic}
# * {Channel#close}
#
+ # refer to documentation for those methods for usage examples.
+ #
# Channel provides a number of convenience methods that instantiate queues and exchanges
# of various types associated with this channel:
#
# * {Channel#queue}
# * {Channel#default_exchange}
# * {Channel#direct}
# * {Channel#fanout}
# * {Channel#topic}
#
+ #
+ # h2. Error handling
+ #
+ # It is possible (and, indeed, recommended) to handle channel-level exceptions by defining an errback using #on_error:
+ #
+ # @example Queue declaration with incompatible attributes results in a channel-level exception
+ # AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
+ # AMQP::Channel.new do |channel, open_ok|
+ # puts "Channel ##{channel.id} is now open!"
+ #
+ # channel.on_error do |ch, close|
+ # puts "Handling channel-level exception"
+ #
+ # connection.close {
+ # EM.stop { exit }
+ # }
+ # end
+ #
+ # EventMachine.add_timer(0.4) do
+ # # these two definitions result in a race condition. For sake of this example,
+ # # however, it does not matter. Whatever definition succeeds first, 2nd one will
+ # # cause a channel-level exception (because attributes are not identical)
+ # AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => false) do |queue|
+ # puts "#{queue.name} is ready to go"
+ # end
+ #
+ # AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => true) do |queue|
+ # puts "#{queue.name} is ready to go"
+ # end
+ # end
+ # end
+ # end
+ #
+ # <script src="https://gist.github.com/939490.js?file=gistfile1.rb"></script>
+ #
+ # When channel-level exception is indicated by the broker and errback defined using #on_error is run, channel is already
+ # closed and all queue and exchange objects associated with this channel are reset. The recommended way to recover from
+ # channel-level exceptions is to open a new channel and re-instantiate queues, exchanges and bindings your application
+ # needs.
+ #
+ #
+ #
+ # h2. Closing a channel
+ #
# Channels are opened when objects is instantiated and closed using {#close} method when application no longer
# needs it.
#
+ # @example Closing a channel your application no longer needs
+ # # this assumes EventMachine reactor is running
+ # AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
+ # AMQP::Channel.new(client) do |channel, open_ok|
+ # channel.close do |close_ok|
+ # # when this block is executed, channel is successfully closed
+ # end
+ # end
+ # end
+ #
+ # <script src="https://gist.github.com/939483.js?file=gistfile1.rb"></script>
+ #
+ #
+ # h2. RabbitMQ extensions.
+ #
+ # AMQP gem supports several RabbitMQ extensions taht extend Channel functionality.
+ # Learn more in {file:docs/VendorSpecificExtensions.textile}
+ #
# @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5)
class Channel < AMQ::Client::Channel
#
# API
@@ -61,45 +157,57 @@
# @note We encourage you to not rely on default AMQP connection and pass connection parameter
# explicitly.
#
- # @param [AMQ::Client::EventMachineAdapter] Connection to open this channel on. If not given, default AMQP
- # connection (accessible via {AMQP.connection}) will be used.
- # @param [Integer] Channel id. Must not be greater than max channel id client and broker
- # negotiated on during connection setup. Almost always the right thing to do
- # is to let AMQP gem pick channel identifier for you.
+ # @param [AMQP::Session] connection Connection to open this channel on. If not given, default AMQP
+ # connection (accessible via {AMQP.connection}) will be used.
+ # @param [Integer] id Channel id. Must not be greater than max channel id client and broker
+ # negotiated on during connection setup. Almost always the right thing to do
+ # is to let AMQP gem pick channel identifier for you. If you want to get next
+ # channel id, use {AMQP::Channel.next_channel_id} (it is thread-safe).
+ # @param [Hash] options A hash of options
#
# @example Instantiating a channel for default connection (accessible as AMQP.connection)
#
# AMQP.connect do |connection|
- # AMQP::Channel.new(connection) do |channel|
+ # AMQP::Channel.new(connection) do |channel, open_ok|
# # channel is ready: set up your messaging flow by creating exchanges,
# # queues, binding them together and so on.
# end
# end
#
# @example Instantiating a channel for explicitly given connection
#
# AMQP.connect do |connection|
- # AMQP::Channel.new(connection) do |channel|
- # # channel is ready: set up your messaging flow by creating exchanges,
- # # queues, binding them together and so on.
+ # AMQP::Channel.new(connection) do |channel, open_ok|
+ # # ...
# end
# end
#
+ # @example Instantiating a channel with a :prefetch option
#
+ # AMQP.connect do |connection|
+ # AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, :prefetch => 5) do |channel, open_ok|
+ # # ...
+ # end
+ # end
+ #
+ #
+ # @option options [Boolean] :prefetch (nil) Specifies number of messages to prefetch. Channel-specific. See {AMQP::Channel#prefetch}.
+ #
# @yield [channel, open_ok] Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional.
# @yieldparam [Channel] channel Channel that is successfully open
# @yieldparam [AMQP::Protocol::Channel::OpenOk] open_ok AMQP channel.open-ok) instance
#
#
+ # @see AMQP::Channel#prefetch
# @api public
- def initialize(connection = nil, id = self.class.next_channel_id, &block)
+ def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block)
raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running?
- @connection = connection || AMQP.start
+ @connection = connection || AMQP.connection || AMQP.start
super(@connection, id)
@rpcs = Hash.new
# we need this deferrable to mimic what AMQP gem 0.7 does to enable
@@ -125,18 +233,28 @@
case block.arity
when 1 then block.call(ch)
else block.call(ch, open_ok)
end # case
end # if
+
+ self.prefetch(options[:prefetch], false) if options[:prefetch]
end # self.open
end # @connection.on_open
end
-
+ # Takes a block that will be deferred till the moment when channel is considered open
+ # (channel.open-ok is received from the broker). If you need to delay an operation
+ # till the moment channel is open, this method is what you are looking for.
+ #
+ # Multiple callbacks are supported. If when this moment is called, channel is already
+ # open, block is executed immediately.
+ #
+ # @api public
def once_open(&block)
@channel_is_open_deferrable.callback(&block)
end # once_open(&block)
+ alias once_opened once_open
# Defines, intializes and returns a direct Exchange instance.
#
# Learn more about direct exchanges in {Exchange Exchange class documentation}.
@@ -596,39 +714,45 @@
validate_parameters_match!(queue, extended_opts)
queue
else
- queue = if block.nil?
- Queue.new(self, name, opts)
- else
- shim = Proc.new { |q, method|
- queue = find_queue(method.queue)
- if block.arity == 1
- block.call(queue)
- else
- block.call(queue, method.consumer_count, method.message_count)
- end
- }
- Queue.new(self, name, opts, &shim)
- end
-
- register_queue(queue)
+ self.queue!(name, opts, &block)
end
end
# Returns true if channel is not closed.
# @return [Boolean]
# @api public
def open?
self.status == :opened || self.status == :opening
end # open?
-
+ # Same as {Channel#queue} but when queue with the same name already exists in this channel
+ # object's cache, this method will replace existing queue with a newly defined one. Consider
+ # using {Channel#queue} instead.
+ #
+ # @see Channel#queue
+ #
+ # @return [Queue]
+ # @api public
def queue!(name, opts = {}, &block)
- # TODO
- raise NotImplementedError.new
+ queue = if block.nil?
+ Queue.new(self, name, opts)
+ else
+ shim = Proc.new { |q, method|
+ queue = find_queue(method.queue)
+ if block.arity == 1
+ block.call(queue)
+ else
+ block.call(queue, method.consumer_count, method.message_count)
+ end
+ }
+ Queue.new(self, name, opts, &shim)
+ end
+
+ register_queue(queue)
end
# Instantiates and returns an RPC instance associated with this channel.
#
@@ -641,11 +765,11 @@
# marshalling is subject to the same restrictions as defined in the
# Marshal[http://ruby-doc.org/core/classes/Marshal.html] standard
# library. See that documentation for further reference.
#
# When the optional object is not passed, the returned rpc reference is
- # used to send messages and arguments to the queue. See #method_missing
+ # used to send messages and arguments to the queue. See {AMQP::RPC#method_missing}
# which does all of the heavy lifting with the proxy. Some client
# elsewhere must call this method *with* the optional block so that
# there is a valid destination. Failure to do so will just enqueue
# marshalled messages that are never consumed.
#
@@ -660,29 +784,23 @@
def rpc(name, obj = nil)
RPC.new(self, name, obj)
end
- # Define a callback to be run on channel-level exception.
- #
- # @param [String] msg Error message
- #
- # @api public
- def self.error(msg = nil, &block)
- # TODO
- raise NotImplementedError.new
- end
- # @param [Fixnum] size
+
+ # @param [Fixnum] Message count
# @param [Boolean] global (false)
#
# @return [Channel] self
#
# @api public
- def prefetch(size, global = false, &block)
- # RabbitMQ as of 2.3.1 does not support prefetch_size.
- self.qos(0, size, global, &block)
+ def prefetch(count, global = false, &block)
+ self.once_open do
+ # RabbitMQ as of 2.3.1 does not support prefetch_size.
+ self.qos(0, count, global, &block)
+ end
self
end
@@ -701,31 +819,91 @@
#
# Implementation
#
+ # Defines a global callback to be run on channel-level exception across
+ # all channels. Consider using Channel#on_error instead. This method is here for sake
+ # of backwards compatibility with 0.6.x and 0.7.x releases.
+ #
+ # @param [String] msg Error message that passed to previously defined handler
+ #
+ # @deprecated
+ # @api public
+ # @private
+ def self.error(msg = nil, &block)
+ if block
+ @global_error_handler = block
+ else
+ @global_error_handler.call(msg) if @global_error_handler && msg
+ end
+ end
+
+ # Defines a global callback to be run on channel-level exception across
+ # all channels. Consider using Channel#on_error instead. This method is here for sake
+ # of backwards compatibility with 0.6.x and 0.7.x releases.
+ # @see AMQP::Channel#on_error
+ # @deprecated
+ # @api public
+ def self.on_error(&block)
+ self.error(&block)
+ end # self.on_error(&block)
+
+ # Overrides AMQ::Client::Channel version to also call global callback
+ # (if defined) for backwards compatibility.
+ #
+ # @private
+ # @api private
+ def handle_close(_, exception = nil)
+ super(_, exception)
+
+ self.class.error(exception.message)
+ end
+
+
# Resets channel state (for example, list of registered queue objects and so on).
#
# Most of the time, this method is not
# called by application code.
#
# @private
# @api plugin
def reset
- # TODO
- raise NotImplementedError.new
+ # See AMQ::Client::Channel
+ self.reset_state!
+
+ # there is no way to reset a deferrable; we have to use a new instance. MK.
+ @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
end
# @private
+ # @api plugin
+ def reset_state!
+ super
+ @rpcs = Hash.new
+ end # reset_state!
+
+
+ # Overrides superclass method to also re-create @channel_is_open_deferrable
+ #
+ # @api plugin
+ # @private
+ def handle_connection_interruption(exception = nil)
+ super(exception)
+ @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
+ end
+
+
+ # @private
# @api private
def self.channel_id_mutex
@channel_id_mutex ||= Mutex.new
end
+ # Returns incrementing channel id. This method is thread safe.
# @return [Fixnum]
- # @private
- # @api private
+ # @api public
def self.next_channel_id
channel_id_mutex.synchronize do
@last_channel_id ||= 0
@last_channel_id += 1
@@ -748,9 +926,10 @@
end
protected
+ # @private
def validate_parameters_match!(entity, parameters)
unless entity.opts == parameters || parameters[:passive]
raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters)
end
end # validate_parameters_match!(entity, parameters)