lib/amqp/channel.rb in amqp-0.8.0.rc2 vs lib/amqp/channel.rb in amqp-0.8.0.rc3

- old
+ new

@@ -2,26 +2,58 @@ require "amqp/exchange" require "amqp/queue" module AMQP - # To quote {AMQP 0.9.1 specification http://bit.ly/hw2ELX}: + # h2. What are AMQP channels # + # To quote {http://bit.ly/hw2ELX AMQP 0.9.1 specification}: + # # AMQP is a multi-channelled protocol. Channels provide a way to multiplex # a heavyweight TCP/IP connection into several light weight connections. # This makes the protocol more “firewall friendly” since port usage is predictable. # It also means that traffic shaping and other network QoS features can be easily employed. # Channels are independent of each other and can perform different functions simultaneously # with other channels, the available bandwidth being shared between the concurrent activities. # + # h2. Opening a channel # - # h2. RabbitMQ extensions. + # *Channels are opened asynchronously*. There are two ways to do it: using a callback or pseudo-synchronous mode. # - # AMQP gem supports several RabbitMQ extensions taht extend Channel functionality. - # Learn more in {file:docs/VendorSpecificExtensions.textile} + # @example Opening a channel with a callback + # # this assumes EventMachine reactor is running + # AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client| + # AMQP::Channel.new(client) do |channel, open_ok| + # # when this block is executed, channel is open and ready for use + # end + # end # + # <script src="https://gist.github.com/939480.js?file=gistfile1.rb"></script> # + # Unless your application needs multiple channels, this approach is recommended. Alternatively, + # AMQP::Channel can be instantiated without a block. Then returned channel is not immediately open, + # however, it can be used as if it was a synchronous, blocking method: + # + # @example Instantiating a channel that will be open eventually + # # this assumes EventMachine reactor is running + # AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client| + # channel = AMQP::Channel.new(client) + # exchange = channel.default_exchange + # + # # ... + # end + # + # <script src="https://gist.github.com/939482.js?file=gistfile1.rb"></script> + # + # Even though in the example above channel isn't immediately open, it is safe to declare exchanges using + # it. Exchange declaration will be delayed until after channel is open. Same applies to queue declaration + # and other operations on exchanges and queues. Library methods that rely on channel being open will be + # enqueued and executed in a FIFO manner when broker confirms channel opening. + # Note, however, that *this "pseudo-synchronous mode" is easy to abuse and introduce race conditions AMQP gem + # cannot resolve for you*. AMQP is an inherently asynchronous protocol and AMQP gem embraces this fact. + # + # # h2. Key methods # # Key methods of Channel class are # # * {Channel#queue} @@ -29,22 +61,86 @@ # * {Channel#direct} # * {Channel#fanout} # * {Channel#topic} # * {Channel#close} # + # refer to documentation for those methods for usage examples. + # # Channel provides a number of convenience methods that instantiate queues and exchanges # of various types associated with this channel: # # * {Channel#queue} # * {Channel#default_exchange} # * {Channel#direct} # * {Channel#fanout} # * {Channel#topic} # + # + # h2. Error handling + # + # It is possible (and, indeed, recommended) to handle channel-level exceptions by defining an errback using #on_error: + # + # @example Queue declaration with incompatible attributes results in a channel-level exception + # AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok| + # AMQP::Channel.new do |channel, open_ok| + # puts "Channel ##{channel.id} is now open!" + # + # channel.on_error do |ch, close| + # puts "Handling channel-level exception" + # + # connection.close { + # EM.stop { exit } + # } + # end + # + # EventMachine.add_timer(0.4) do + # # these two definitions result in a race condition. For sake of this example, + # # however, it does not matter. Whatever definition succeeds first, 2nd one will + # # cause a channel-level exception (because attributes are not identical) + # AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => false) do |queue| + # puts "#{queue.name} is ready to go" + # end + # + # AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => true) do |queue| + # puts "#{queue.name} is ready to go" + # end + # end + # end + # end + # + # <script src="https://gist.github.com/939490.js?file=gistfile1.rb"></script> + # + # When channel-level exception is indicated by the broker and errback defined using #on_error is run, channel is already + # closed and all queue and exchange objects associated with this channel are reset. The recommended way to recover from + # channel-level exceptions is to open a new channel and re-instantiate queues, exchanges and bindings your application + # needs. + # + # + # + # h2. Closing a channel + # # Channels are opened when objects is instantiated and closed using {#close} method when application no longer # needs it. # + # @example Closing a channel your application no longer needs + # # this assumes EventMachine reactor is running + # AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client| + # AMQP::Channel.new(client) do |channel, open_ok| + # channel.close do |close_ok| + # # when this block is executed, channel is successfully closed + # end + # end + # end + # + # <script src="https://gist.github.com/939483.js?file=gistfile1.rb"></script> + # + # + # h2. RabbitMQ extensions. + # + # AMQP gem supports several RabbitMQ extensions taht extend Channel functionality. + # Learn more in {file:docs/VendorSpecificExtensions.textile} + # # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5) class Channel < AMQ::Client::Channel # # API @@ -61,45 +157,57 @@ # @note We encourage you to not rely on default AMQP connection and pass connection parameter # explicitly. # - # @param [AMQ::Client::EventMachineAdapter] Connection to open this channel on. If not given, default AMQP - # connection (accessible via {AMQP.connection}) will be used. - # @param [Integer] Channel id. Must not be greater than max channel id client and broker - # negotiated on during connection setup. Almost always the right thing to do - # is to let AMQP gem pick channel identifier for you. + # @param [AMQP::Session] connection Connection to open this channel on. If not given, default AMQP + # connection (accessible via {AMQP.connection}) will be used. + # @param [Integer] id Channel id. Must not be greater than max channel id client and broker + # negotiated on during connection setup. Almost always the right thing to do + # is to let AMQP gem pick channel identifier for you. If you want to get next + # channel id, use {AMQP::Channel.next_channel_id} (it is thread-safe). + # @param [Hash] options A hash of options # # @example Instantiating a channel for default connection (accessible as AMQP.connection) # # AMQP.connect do |connection| - # AMQP::Channel.new(connection) do |channel| + # AMQP::Channel.new(connection) do |channel, open_ok| # # channel is ready: set up your messaging flow by creating exchanges, # # queues, binding them together and so on. # end # end # # @example Instantiating a channel for explicitly given connection # # AMQP.connect do |connection| - # AMQP::Channel.new(connection) do |channel| - # # channel is ready: set up your messaging flow by creating exchanges, - # # queues, binding them together and so on. + # AMQP::Channel.new(connection) do |channel, open_ok| + # # ... # end # end # + # @example Instantiating a channel with a :prefetch option # + # AMQP.connect do |connection| + # AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, :prefetch => 5) do |channel, open_ok| + # # ... + # end + # end + # + # + # @option options [Boolean] :prefetch (nil) Specifies number of messages to prefetch. Channel-specific. See {AMQP::Channel#prefetch}. + # # @yield [channel, open_ok] Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional. # @yieldparam [Channel] channel Channel that is successfully open # @yieldparam [AMQP::Protocol::Channel::OpenOk] open_ok AMQP channel.open-ok) instance # # + # @see AMQP::Channel#prefetch # @api public - def initialize(connection = nil, id = self.class.next_channel_id, &block) + def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block) raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running? - @connection = connection || AMQP.start + @connection = connection || AMQP.connection || AMQP.start super(@connection, id) @rpcs = Hash.new # we need this deferrable to mimic what AMQP gem 0.7 does to enable @@ -125,18 +233,28 @@ case block.arity when 1 then block.call(ch) else block.call(ch, open_ok) end # case end # if + + self.prefetch(options[:prefetch], false) if options[:prefetch] end # self.open end # @connection.on_open end - + # Takes a block that will be deferred till the moment when channel is considered open + # (channel.open-ok is received from the broker). If you need to delay an operation + # till the moment channel is open, this method is what you are looking for. + # + # Multiple callbacks are supported. If when this moment is called, channel is already + # open, block is executed immediately. + # + # @api public def once_open(&block) @channel_is_open_deferrable.callback(&block) end # once_open(&block) + alias once_opened once_open # Defines, intializes and returns a direct Exchange instance. # # Learn more about direct exchanges in {Exchange Exchange class documentation}. @@ -596,39 +714,45 @@ validate_parameters_match!(queue, extended_opts) queue else - queue = if block.nil? - Queue.new(self, name, opts) - else - shim = Proc.new { |q, method| - queue = find_queue(method.queue) - if block.arity == 1 - block.call(queue) - else - block.call(queue, method.consumer_count, method.message_count) - end - } - Queue.new(self, name, opts, &shim) - end - - register_queue(queue) + self.queue!(name, opts, &block) end end # Returns true if channel is not closed. # @return [Boolean] # @api public def open? self.status == :opened || self.status == :opening end # open? - + # Same as {Channel#queue} but when queue with the same name already exists in this channel + # object's cache, this method will replace existing queue with a newly defined one. Consider + # using {Channel#queue} instead. + # + # @see Channel#queue + # + # @return [Queue] + # @api public def queue!(name, opts = {}, &block) - # TODO - raise NotImplementedError.new + queue = if block.nil? + Queue.new(self, name, opts) + else + shim = Proc.new { |q, method| + queue = find_queue(method.queue) + if block.arity == 1 + block.call(queue) + else + block.call(queue, method.consumer_count, method.message_count) + end + } + Queue.new(self, name, opts, &shim) + end + + register_queue(queue) end # Instantiates and returns an RPC instance associated with this channel. # @@ -641,11 +765,11 @@ # marshalling is subject to the same restrictions as defined in the # Marshal[http://ruby-doc.org/core/classes/Marshal.html] standard # library. See that documentation for further reference. # # When the optional object is not passed, the returned rpc reference is - # used to send messages and arguments to the queue. See #method_missing + # used to send messages and arguments to the queue. See {AMQP::RPC#method_missing} # which does all of the heavy lifting with the proxy. Some client # elsewhere must call this method *with* the optional block so that # there is a valid destination. Failure to do so will just enqueue # marshalled messages that are never consumed. # @@ -660,29 +784,23 @@ def rpc(name, obj = nil) RPC.new(self, name, obj) end - # Define a callback to be run on channel-level exception. - # - # @param [String] msg Error message - # - # @api public - def self.error(msg = nil, &block) - # TODO - raise NotImplementedError.new - end - # @param [Fixnum] size + + # @param [Fixnum] Message count # @param [Boolean] global (false) # # @return [Channel] self # # @api public - def prefetch(size, global = false, &block) - # RabbitMQ as of 2.3.1 does not support prefetch_size. - self.qos(0, size, global, &block) + def prefetch(count, global = false, &block) + self.once_open do + # RabbitMQ as of 2.3.1 does not support prefetch_size. + self.qos(0, count, global, &block) + end self end @@ -701,31 +819,91 @@ # # Implementation # + # Defines a global callback to be run on channel-level exception across + # all channels. Consider using Channel#on_error instead. This method is here for sake + # of backwards compatibility with 0.6.x and 0.7.x releases. + # + # @param [String] msg Error message that passed to previously defined handler + # + # @deprecated + # @api public + # @private + def self.error(msg = nil, &block) + if block + @global_error_handler = block + else + @global_error_handler.call(msg) if @global_error_handler && msg + end + end + + # Defines a global callback to be run on channel-level exception across + # all channels. Consider using Channel#on_error instead. This method is here for sake + # of backwards compatibility with 0.6.x and 0.7.x releases. + # @see AMQP::Channel#on_error + # @deprecated + # @api public + def self.on_error(&block) + self.error(&block) + end # self.on_error(&block) + + # Overrides AMQ::Client::Channel version to also call global callback + # (if defined) for backwards compatibility. + # + # @private + # @api private + def handle_close(_, exception = nil) + super(_, exception) + + self.class.error(exception.message) + end + + # Resets channel state (for example, list of registered queue objects and so on). # # Most of the time, this method is not # called by application code. # # @private # @api plugin def reset - # TODO - raise NotImplementedError.new + # See AMQ::Client::Channel + self.reset_state! + + # there is no way to reset a deferrable; we have to use a new instance. MK. + @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new end # @private + # @api plugin + def reset_state! + super + @rpcs = Hash.new + end # reset_state! + + + # Overrides superclass method to also re-create @channel_is_open_deferrable + # + # @api plugin + # @private + def handle_connection_interruption(exception = nil) + super(exception) + @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new + end + + + # @private # @api private def self.channel_id_mutex @channel_id_mutex ||= Mutex.new end + # Returns incrementing channel id. This method is thread safe. # @return [Fixnum] - # @private - # @api private + # @api public def self.next_channel_id channel_id_mutex.synchronize do @last_channel_id ||= 0 @last_channel_id += 1 @@ -748,9 +926,10 @@ end protected + # @private def validate_parameters_match!(entity, parameters) unless entity.opts == parameters || parameters[:passive] raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters) end end # validate_parameters_match!(entity, parameters)