lib/amqp/channel.rb in amqp-0.7.5 vs lib/amqp/channel.rb in amqp-0.8.0.beta1

- old
+ new

@@ -1,655 +1,633 @@ # encoding: utf-8 -require "amqp/collection" +require "amqp/exchange" +require "amqp/queue" module AMQP - # The top-level class for building AMQP clients. This class contains several - # convenience methods for working with queues and exchanges. Many calls - # delegate/forward to subclasses, but this is the preferred API. The subclass - # API is subject to change while this high-level API will likely remain - # unchanged as the library evolves. All code examples will be written using - # the AMQP API. + # To quote {AMQP 0.9.1 specification http://bit.ly/hw2ELX}: # - # Below is a somewhat complex example that demonstrates several capabilities - # of the library. The example starts a clock using a +fanout+ exchange which - # is used for 1 to many communications. Each consumer generates a queue to - # receive messages and do some operation (in this case, print the time). - # One consumer prints messages every second while the second consumer prints - # messages every 2 seconds. After 5 seconds has elapsed, the 1 second - # consumer is deleted. + # AMQP is a multi-channelled protocol. Channels provide a way to multiplex + # a heavyweight TCP/IP connection into several light weight connections. + # This makes the protocol more “firewall friendly” since port usage is predictable. + # It also means that traffic shaping and other network QoS features can be easily employed. + # Channels are independent of each other and can perform different functions simultaneously + # with other channels, the available bandwidth being shared between the concurrent activities. # - # Of interest is the relationship of EventMachine to the process. All AMQP - # operations must occur within the context of an EM.run block. We start - # EventMachine in its own thread with an empty block; all subsequent calls - # to the AMQP API add their blocks to the EM.run block. This demonstrates how - # the library could be used to build up and tear down communications outside - # the context of an EventMachine block and/or integrate the library with - # other synchronous operations. See the EventMachine documentation for - # more information. # - # require 'rubygems' - # require 'mq' + # h2. RabbitMQ extensions. # - # thr = Thread.new { EM.run } + # AMQP gem supports several RabbitMQ extensions taht extend Channel functionality. + # Learn more in {file:docs/VendorSpecificExtensions.textile} # - # # turns on extreme logging - # #AMQP.logging = true # - # def log *args - # p args - # end + # h2. Key methods # - # def publisher - # clock = AMQP::Channel.fanout('clock') - # EM.add_periodic_timer(1) do - # puts + # Key methods of Channel class are # - # log :publishing, time = Time.now - # clock.publish(Marshal.dump(time)) - # end - # end + # * {Channel#queue} + # * {Channel#default_exchange} + # * {Channel#direct} + # * {Channel#fanout} + # * {Channel#topic} + # * {Channel#close} # - # def one_second_consumer - # AMQP::Channel.queue('every second').bind(AMQP::Channel.fanout('clock')).subscribe do |time| - # log 'every second', :received, Marshal.load(time) - # end - # end + # Channel provides a number of convenience methods that instantiate queues and exchanges + # of various types associated with this channel: # - # def two_second_consumer - # AMQP::Channel.queue('every 2 seconds').bind('clock').subscribe do |time| - # time = Marshal.load(time) - # log 'every 2 seconds', :received, time if time.sec % 2 == 0 - # end - # end + # * {Channel#queue} + # * {Channel#default_exchange} + # * {Channel#direct} + # * {Channel#fanout} + # * {Channel#topic} # - # def delete_one_second - # EM.add_timer(5) do - # # delete the 'every second' queue - # log 'Deleting [every second] queue' - # AMQP::Channel.queue('every second').delete - # end - # end + # Channels are opened when objects is instantiated and closed using {#close} method when application no longer + # needs it. # - # publisher - # one_second_consumer - # two_second_consumer - # delete_one_second - # thr.join - # - # __END__ - # - # [:publishing, Tue Jan 06 22:46:14 -0600 2009] - # ["every second", :received, Tue Jan 06 22:46:14 -0600 2009] - # ["every 2 seconds", :received, Tue Jan 06 22:46:14 -0600 2009] - # - # [:publishing, Tue Jan 06 22:46:16 -0600 2009] - # ["every second", :received, Tue Jan 06 22:46:16 -0600 2009] - # ["every 2 seconds", :received, Tue Jan 06 22:46:16 -0600 2009] - # - # [:publishing, Tue Jan 06 22:46:17 -0600 2009] - # ["every second", :received, Tue Jan 06 22:46:17 -0600 2009] - # - # [:publishing, Tue Jan 06 22:46:18 -0600 2009] - # ["every second", :received, Tue Jan 06 22:46:18 -0600 2009] - # ["every 2 seconds", :received, Tue Jan 06 22:46:18 -0600 2009] - # ["Deleting [every second] queue"] - # - # [:publishing, Tue Jan 06 22:46:19 -0600 2009] - # - # [:publishing, Tue Jan 06 22:46:20 -0600 2009] - # ["every 2 seconds", :received, Tue Jan 06 22:46:20 -0600 2009] - # - class Channel + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5) + class Channel < AMQ::Client::Channel # - # Behaviors + # API # - include EM::Deferrable + # AMQP connection this channel is part of + # @return [Connection] + attr_reader :connection + alias :conn :connection + # Status of this channel (one of: :opening, :closing, :open, :closed) + # @return [Symbol] + attr_reader :status + # @note We encourage you to not rely on default AMQP connection and pass connection parameter + # explicitly. # - # API + # @param [AMQ::Client::EventMachineAdapter] Connection to open this channel on. If not given, default AMQP + # connection (accessible via {AMQP.connection}) will be used. + # @param [Integer] Channel id. Must not be greater than max channel id client and broker + # negotiated on during connection setup. Almost always the right thing to do + # is to let AMQP gem pick channel identifier for you. # - - # Returns a new channel. A channel is a bidirectional virtual - # connection between the client and the AMQP server. Elsewhere in the - # library the channel is referred to in parameter lists as +mq+. + # @example Instantiating a channel for default connection (accessible as AMQP.connection) # - # Optionally takes the result from calling AMQP::connect. + # AMQP.connect do |connection| + # AMQP::Channel.new(connection) do |channel| + # # channel is ready: set up your messaging flow by creating exchanges, + # # queues, binding them together and so on. + # end + # end # - # Rarely called directly by client code. This is implicitly called - # by most instance methods. See #method_missing. + # @example Instantiating a channel for explicitly given connection # - # EM.run do - # channel = AMQP::Channel.new - # end + # AMQP.connect do |connection| + # AMQP::Channel.new(connection) do |channel| + # # channel is ready: set up your messaging flow by creating exchanges, + # # queues, binding them together and so on. + # end + # end # - # EM.run do - # channel = AMQP::Channel.new AMQP::connect - # end # - def initialize(connection = nil) + # @yield [channel, open_ok] Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional. + # @yieldparam [Channel] channel Channel that is successfully open + # @yieldparam [AMQP::Protocol::Channel::OpenOk] open_ok AMQP channel.open-ok) instance + # + # + # @api public + def initialize(connection = nil, id = self.class.next_channel_id, &block) raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running? - @_send_mutex = Mutex.new - @get_queue_mutex = Mutex.new - @connection = connection || AMQP.start - @queues_awaiting_declare_ok = Array.new + super(@connection, id) - conn.callback { |c| - @channel = c.add_channel(self) - send Protocol::Channel::Open.new - } - end + @rpcs = Hash.new + # we need this deferrable to mimic what AMQP gem 0.7 does to enable + # the following (HIGHLY discouraged) style of programming some people use in their + # existing codebases: + # + # connection = AMQP.connect + # channel = AMQP::Channel.new(connection) + # queue = AMQP::Queue.new(channel) + # + # ... + # + # Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK. + @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new - attr_reader :channel, :connection, :status - alias :conn :connection + # only send channel.open when connection is actually open. Makes it possible to + # do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK. + @connection.on_open do + self.open do |*args| + @channel_is_open_deferrable.succeed - attr_reader :queues_awaiting_declare_ok + block.call(*args) if block + end + end + end + def once_open(&block) + @channel_is_open_deferrable.callback(&block) + end # once_open(&block) - def closed? - @status.eql?(:closed) - end - def open? - !self.closed? - end # open? - - # Defines, intializes and returns an Exchange to act as an ingress - # point for all published messages. + # Defines, intializes and returns a direct Exchange instance. # - # == Direct - # A direct exchange is useful for 1:1 communication between a publisher and - # subscriber. Messages are routed to the queue with a binding that shares - # the same name as the exchange. Alternately, the messages are routed to - # the bound queue that shares the same name as the routing key used for - # defining the exchange. This exchange type does not honor the +:key+ option - # when defining a new instance with a name. It _will_ honor the +:key+ option - # if the exchange name is the empty string. - # Allocating this exchange without a name _or_ with the empty string - # will use the internal 'amq.direct' exchange. + # Learn more about direct exchanges in {Exchange Exchange class documentation}. # - # Any published message, regardless of its persistence setting, is thrown - # away by the exchange when there are no queues bound to it. # - # # exchange is named 'foo' - # exchange = AMQP::Channel.direct('foo') + # @param [String] name (amq.direct) Exchange name. # - # # or, the exchange can use the default name (amq.direct) and perform - # # routing comparisons using the :key - # exchange = AMQP::Channel.direct("", :key => 'foo') - # exchange.publish('some data') # will be delivered to queue bound to 'foo' + # @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not + # already exist. The client can use this to check whether an exchange + # exists without modifying the server state. # - # queue = AMQP::Channel.queue('foo') - # # can receive data since the queue name and the exchange key match exactly - # queue.pop { |data| puts "received data [#{data}]" } + # @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as + # durable. Durable exchanges and their bindings are recreated upon a server + # restart (information about them is persisted). Non-durable (transient) exchanges + # do not survive if/when a server restarts (information about them is stored exclusively + # in RAM). # - # == Options - # * :passive => true | false (default false) - # If set, the server will not create the exchange if it does not - # already exist. The client can use this to check whether an exchange - # exists without modifying the server state. # - # * :durable => true | false (default false) - # If set when creating a new exchange, the exchange will be marked as - # durable. Durable exchanges remain active when a server restarts. - # Non-durable exchanges (transient exchanges) are purged if/when a - # server restarts. + # @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished + # using it. The server waits for a short period of time before + # determining the exchange is unused to give time to the client code + # to bind a queue to it. # - # A transient exchange (the default) is stored in memory-only. The - # exchange and all bindings will be lost on a server restart. - # It makes no sense to publish a persistent message to a transient - # exchange. + # @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but + # only when bound to other exchanges. Internal exchanges are used to + # construct wiring that is not visible to applications. This is a RabbitMQ-specific + # extension. # - # Durable exchanges and their bindings are recreated upon a server - # restart. Any published messages not routed to a bound queue are lost. + # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. # - # * :auto_delete => true | false (default false) - # If set, the exchange is deleted when all queues have finished - # using it. The server waits for a short period of time before - # determining the exchange is unused to give time to the client code - # to bind a queue to it. # - # If the exchange has been previously declared, this option is ignored - # on subsequent declarations. + # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration. + # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist. # - # * :internal => true | false (default false) - # If set, the exchange may not be used directly by publishers, but - # only when bound to other exchanges. Internal exchanges are used to - # construct wiring that is not visible to applications. # - # * :nowait => true | false (default true) - # If set, the server will not respond to the method. The client should - # not wait for a reply method. If the server could not complete the - # method it will raise a channel or connection exception. + # @example Using default pre-declared direct exchange and no callbacks (pseudo-synchronous style) # - # == Exceptions - # Doing any of these activities are illegal and will raise AMQP::Error. - # * redeclare an already-declared exchange to a different type - # * :passive => true and the exchange does not exist (NOT_FOUND) + # # an exchange application A will be using to publish updates + # # to some search index + # exchange = channel.direct("index.updates") # + # # In the same (or different) process declare a queue that broker will + # # generate name for, bind it to aforementioned exchange using method chaining + # queue = channel.queue(""). + # # queue will be receiving messages that were published with + # # :routing_key attribute value of "search.index.updates" + # bind(exchange, :routing_key => "search.index.updates"). + # # register a callback that will be run when messages arrive + # subscribe { |header, message| puts("Received #{message}") } + # + # # now publish a new document contents for indexing, + # # message will be delivered to the queue we declared and bound on the line above + # exchange.publish(document.content, :routing_key => "search.index.updates") + # + # + # @example Instantiating a direct exchange using {Channel#direct} with a callback + # + # AMQP.connect do |connection| + # AMQP::Channel.new(connection) do |channel| + # channel.direct("email.replies_listener") do |exchange, declare_ok| + # # by now exchange is ready and waiting + # end + # end + # end + # + # + # @see Channel#default_exchange + # @see Exchange + # @see Exchange#initialize + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.1) + # + # @return [Exchange] + # @api public def direct(name = 'amq.direct', opts = {}, &block) - if exchange = self.exchanges.find { |exchange| exchange.name == name } + if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:direct, name, opts, block) validate_parameters_match!(exchange, extended_opts) exchange else - self.exchanges << Exchange.new(self, :direct, name, opts, &block) + register_exchange(Exchange.new(self, :direct, name, opts, &block)) end end # Returns exchange object with the same name as default (aka unnamed) exchange. # Default exchange is a direct exchange and automatically routes messages to # queues when routing key matches queue name exactly. This feature is known as # "automatic binding" (of queues to default exchange). # # *Use default exchange when you want to route messages directly to specific queues* # (queue names are known, you don't mind this kind of coupling between applications). + # + # + # @example Using default exchange to publish messages to queues with known names + # AMQP.start(:host => 'localhost') do |connection| + # ch = AMQP::Channel.new(connection) + # + # queue1 = ch.queue("queue1").subscribe do |payload| + # puts "[#{queue1.name}] => #{payload}" + # end + # queue2 = ch.queue("queue2").subscribe do |payload| + # puts "[#{queue2.name}] => #{payload}" + # end + # queue3 = ch.queue("queue3").subscribe do |payload| + # puts "[#{queue3.name}] => #{payload}" + # end + # queues = [queue1, queue2, queue3] + # + # # Rely on default direct exchange binding, see section 2.1.2.4 Automatic Mode in AMQP 0.9.1 spec. + # exchange = AMQP::Exchange.default + # EM.add_periodic_timer(1) do + # q = queues.sample + # + # exchange.publish "Some payload from #{Time.now.to_i}", :routing_key => q.name + # end + # end + # + # + # + # @see Exchange + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.2.4) + # + # @return [Exchange] + # @api public def default_exchange Exchange.default(self) end - - # Defines, intializes and returns an Exchange to act as an ingress - # point for all published messages. + # Defines, intializes and returns a fanout Exchange instance. # - # == Fanout - # A fanout exchange is useful for 1:N communication where one publisher - # feeds multiple subscribers. Like direct exchanges, messages published - # to a fanout exchange are delivered to queues whose name matches the - # exchange name (or are bound to that exchange name). Each queue gets - # its own copy of the message. + # Learn more about fanout exchanges in {Exchange Exchange class documentation}. # - # Any published message, regardless of its persistence setting, is thrown - # away by the exchange when there are no queues bound to it. # - # Like the direct exchange type, this exchange type does not honor the - # +:key+ option when defining a new instance with a name. It _will_ honor - # the +:key+ option if the exchange name is the empty string. - # Allocating this exchange without a name _or_ with the empty string - # will use the internal 'amq.fanout' exchange. + # @param [String] name (amq.fanout) Exchange name. # - # EM.run do - # clock = AMQP::Channel.fanout('clock') - # EM.add_periodic_timer(1) do - # puts "\npublishing #{time = Time.now}" - # clock.publish(Marshal.dump(time)) - # end + # @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not + # already exist. The client can use this to check whether an exchange + # exists without modifying the server state. # - # amq = AMQP::Channel.queue('every second') - # amq.bind(AMQP::Channel.fanout('clock')).subscribe do |time| - # puts "every second received #{Marshal.load(time)}" - # end + # @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as + # durable. Durable exchanges and their bindings are recreated upon a server + # restart (information about them is persisted). Non-durable (transient) exchanges + # do not survive if/when a server restarts (information about them is stored exclusively + # in RAM). # - # # note the string passed to #bind - # AMQP::Channel.queue('every 5 seconds').bind('clock').subscribe do |time| - # time = Marshal.load(time) - # puts "every 5 seconds received #{time}" if time.strftime('%S').to_i%5 == 0 - # end - # end # - # == Options - # * :passive => true | false (default false) - # If set, the server will not create the exchange if it does not - # already exist. The client can use this to check whether an exchange - # exists without modifying the server state. + # @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished + # using it. The server waits for a short period of time before + # determining the exchange is unused to give time to the client code + # to bind a queue to it. # - # * :durable => true | false (default false) - # If set when creating a new exchange, the exchange will be marked as - # durable. Durable exchanges remain active when a server restarts. - # Non-durable exchanges (transient exchanges) are purged if/when a - # server restarts. + # @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but + # only when bound to other exchanges. Internal exchanges are used to + # construct wiring that is not visible to applications. This is a RabbitMQ-specific + # extension. # - # A transient exchange (the default) is stored in memory-only. The - # exchange and all bindings will be lost on a server restart. - # It makes no sense to publish a persistent message to a transient - # exchange. + # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. # - # Durable exchanges and their bindings are recreated upon a server - # restart. Any published messages not routed to a bound queue are lost. # - # * :auto_delete => true | false (default false) - # If set, the exchange is deleted when all queues have finished - # using it. The server waits for a short period of time before - # determining the exchange is unused to give time to the client code - # to bind a queue to it. + # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration. + # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist. # - # If the exchange has been previously declared, this option is ignored - # on subsequent declarations. # - # * :internal => true | false (default false) - # If set, the exchange may not be used directly by publishers, but - # only when bound to other exchanges. Internal exchanges are used to - # construct wiring that is not visible to applications. + # @example Using fanout exchange to deliver messages to multiple consumers # - # * :nowait => true | false (default true) - # If set, the server will not respond to the method. The client should - # not wait for a reply method. If the server could not complete the - # method it will raise a channel or connection exception. + # # open up a channel + # # declare a fanout exchange + # # declare 3 queues, binds them + # # publish a message # - # == Exceptions - # Doing any of these activities are illegal and will raise AMQP::Error. - # * redeclare an already-declared exchange to a different type - # * :passive => true and the exchange does not exist (NOT_FOUND) + # @see Exchange + # @see Exchange#initialize + # @see Channel#default_exchange + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.2) # + # @return [Exchange] + # @api public def fanout(name = 'amq.fanout', opts = {}, &block) - if exchange = self.exchanges.find { |exchange| exchange.name == name } + if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:fanout, name, opts, block) validate_parameters_match!(exchange, extended_opts) exchange else - self.exchanges << Exchange.new(self, :fanout, name, opts, &block) + register_exchange(Exchange.new(self, :fanout, name, opts, &block)) end end - # Defines, intializes and returns an Exchange to act as an ingress - # point for all published messages. + + # Defines, intializes and returns a topic Exchange instance. # - # == Topic - # A topic exchange allows for messages to be published to an exchange - # tagged with a specific routing key. The Exchange uses the routing key - # to determine which queues to deliver the message. Wildcard matching - # is allowed. The topic must be declared using dot notation to separate - # each subtopic. + # Learn more about topic exchanges in {Exchange Exchange class documentation}. # - # This is the only exchange type to honor the +key+ hash key for all - # cases. + # @param [String] name (amq.topic) Exchange name. # - # Any published message, regardless of its persistence setting, is thrown - # away by the exchange when there are no queues bound to it. # - # As part of the AMQP standard, each server _should_ predeclare a topic - # exchange called 'amq.topic' (this is not required by the standard). - # Allocating this exchange without a name _or_ with the empty string - # will use the internal 'amq.topic' exchange. + # @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not + # already exist. The client can use this to check whether an exchange + # exists without modifying the server state. # - # The classic example is delivering market data. When publishing market - # data for stocks, we may subdivide the stream based on 2 - # characteristics: nation code and trading symbol. The topic tree for - # Apple Computer would look like: - # 'stock.us.aapl' - # For a foreign stock, it may look like: - # 'stock.de.dax' + # @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as + # durable. Durable exchanges and their bindings are recreated upon a server + # restart (information about them is persisted). Non-durable (transient) exchanges + # do not survive if/when a server restarts (information about them is stored exclusively + # in RAM). # - # When publishing data to the exchange, bound queues subscribing to the - # exchange indicate which data interests them by passing a routing key - # for matching against the published routing key. # - # EM.run do - # exch = AMQP::Channel.topic("stocks") - # keys = ['stock.us.aapl', 'stock.de.dax'] + # @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished + # using it. The server waits for a short period of time before + # determining the exchange is unused to give time to the client code + # to bind a queue to it. # - # EM.add_periodic_timer(1) do # every second - # puts - # exch.publish(10+rand(10), :routing_key => keys[rand(2)]) - # end + # @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but + # only when bound to other exchanges. Internal exchanges are used to + # construct wiring that is not visible to applications. This is a RabbitMQ-specific + # extension. # - # # match against one dot-separated item - # AMQP::Channel.queue('us stocks').bind(exch, :key => 'stock.us.*').subscribe do |price| - # puts "us stock price [#{price}]" - # end + # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. # - # # match against multiple dot-separated items - # AMQP::Channel.queue('all stocks').bind(exch, :key => 'stock.#').subscribe do |price| - # puts "all stocks: price [#{price}]" - # end # - # # require exact match - # AMQP::Channel.queue('only dax').bind(exch, :key => 'stock.de.dax').subscribe do |price| - # puts "dax price [#{price}]" - # end - # end + # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration. + # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist. # - # For matching, the '*' (asterisk) wildcard matches against one - # dot-separated item only. The '#' wildcard (hash or pound symbol) - # matches against 0 or more dot-separated items. If none of these - # symbols are used, the exchange performs a comparison looking for an - # exact match. # - # == Options - # * :passive => true | false (default false) - # If set, the server will not create the exchange if it does not - # already exist. The client can use this to check whether an exchange - # exists without modifying the server state. + # @example Using topic exchange to deliver relevant news updates + # AMQP.connect do |connection| + # channel = AMQP::Channel.new(connection) + # exchange = channel.topic("pub/sub") # - # * :durable => true | false (default false) - # If set when creating a new exchange, the exchange will be marked as - # durable. Durable exchanges remain active when a server restarts. - # Non-durable exchanges (transient exchanges) are purged if/when a - # server restarts. + # # Subscribers. + # channel.queue("development").bind(exchange, :key => "technology.dev.#").subscribe do |payload| + # puts "A new dev post: '#{payload}'" + # end + # channel.queue("ruby").bind(exchange, :key => "technology.#.ruby").subscribe do |payload| + # puts "A new post about Ruby: '#{payload}'" + # end # - # A transient exchange (the default) is stored in memory-only. The - # exchange and all bindings will be lost on a server restart. - # It makes no sense to publish a persistent message to a transient - # exchange. + # # Let's publish some data. + # exchange.publish "Ruby post", :routing_key => "technology.dev.ruby" + # exchange.publish "Erlang post", :routing_key => "technology.dev.erlang" + # exchange.publish "Sinatra post", :routing_key => "technology.web.ruby" + # exchange.publish "Jewelery post", :routing_key => "jewelery.ruby" + # end # - # Durable exchanges and their bindings are recreated upon a server - # restart. Any published messages not routed to a bound queue are lost. # - # * :auto_delete => true | false (default false) - # If set, the exchange is deleted when all queues have finished - # using it. The server waits for a short period of time before - # determining the exchange is unused to give time to the client code - # to bind a queue to it. + # @example Using topic exchange to deliver geographically-relevant data + # AMQP.connect do |connection| + # channel = AMQP::Channel.new(connection) + # exchange = channel.topic("pub/sub") # - # If the exchange has been previously declared, this option is ignored - # on subsequent declarations. + # # Subscribers. + # channel.queue("americas.north").bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload| + # puts "An update for North America: #{payload}, routing key is #{headers.routing_key}" + # end + # channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload| + # puts "An update for South America: #{payload}, routing key is #{headers.routing_key}" + # end + # channel.queue("us.california").bind(exchange, :routing_key => "americas.north.us.ca.*").subscribe do |headers, payload| + # puts "An update for US/California: #{payload}, routing key is #{headers.routing_key}" + # end + # channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload| + # puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}" + # end + # channel.queue("it.rome").bind(exchange, :routing_key => "europe.italy.rome").subscribe do |headers, payload| + # puts "An update for Rome, Italy: #{payload}, routing key is #{headers.routing_key}" + # end + # channel.queue("asia.hk").bind(exchange, :routing_key => "asia.southeast.hk.#").subscribe do |headers, payload| + # puts "An update for Hong Kong: #{payload}, routing key is #{headers.routing_key}" + # end # - # * :internal => true | false (default false) - # If set, the exchange may not be used directly by publishers, but - # only when bound to other exchanges. Internal exchanges are used to - # construct wiring that is not visible to applications. + # exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego"). + # publish("Berkeley update", :routing_key => "americas.north.us.ca.berkeley"). + # publish("San Francisco update", :routing_key => "americas.north.us.ca.sanfrancisco"). + # publish("New York update", :routing_key => "americas.north.us.ny.newyork"). + # publish("São Paolo update", :routing_key => "americas.south.brazil.saopaolo"). + # publish("Hong Kong update", :routing_key => "asia.southeast.hk.hongkong"). + # publish("Kyoto update", :routing_key => "asia.southeast.japan.kyoto"). + # publish("Shanghai update", :routing_key => "asia.southeast.prc.shanghai"). + # publish("Rome update", :routing_key => "europe.italy.roma"). + # publish("Paris update", :routing_key => "europe.france.paris") + # end # - # * :nowait => true | false (default true) - # If set, the server will not respond to the method. The client should - # not wait for a reply method. If the server could not complete the - # method it will raise a channel or connection exception. + # @see Exchange + # @see Exchange#initialize + # @see http://www.rabbitmq.com/faq.html#Binding-and-Routing RabbitMQ FAQ on routing & wildcards + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3) # - # == Exceptions - # Doing any of these activities are illegal and will raise AMQP::Error. - # * redeclare an already-declared exchange to a different type - # * :passive => true and the exchange does not exist (NOT_FOUND) - # + # @return [Exchange] + # @api public def topic(name = 'amq.topic', opts = {}, &block) - if exchange = self.exchanges.find { |exchange| exchange.name == name } + if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:topic, name, opts, block) validate_parameters_match!(exchange, extended_opts) exchange else - self.exchanges << Exchange.new(self, :topic, name, opts, &block) + register_exchange(Exchange.new(self, :topic, name, opts, &block)) end end - # Defines, intializes and returns an Exchange to act as an ingress - # point for all published messages. + + # Defines, intializes and returns a headers Exchange instance. # - # == Headers - # A headers exchange allows for messages to be published to an exchange + # Learn more about headers exchanges in {Exchange Exchange class documentation}. # - # Any published message, regardless of its persistence setting, is thrown - # away by the exchange when there are no queues bound to it. + # @param [String] name (amq.match) Exchange name. # - # As part of the AMQP standard, each server _should_ predeclare a headers - # exchange called 'amq.match' (this is not required by the standard). - # Allocating this exchange without a name _or_ with the empty string - # will use the internal 'amq.match' exchange. + # @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not + # already exist. The client can use this to check whether an exchange + # exists without modifying the server state. # - # TODO: The classic example is ... + # @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as + # durable. Durable exchanges and their bindings are recreated upon a server + # restart (information about them is persisted). Non-durable (transient) exchanges + # do not survive if/when a server restarts (information about them is stored exclusively + # in RAM). # - # When publishing data to the exchange, bound queues subscribing to the - # exchange indicate which data interests them by passing arguments - # for matching against the headers in published messages. The - # form of the matching can be controlled by the 'x-match' argument, which - # may be 'any' or 'all'. If unspecified (in RabbitMQ at least), it defaults - # to "all". # - # A value of 'all' for 'x-match' implies that all values must match (i.e. - # it does an AND of the headers ), while a value of 'any' implies that - # at least one should match (ie. it does an OR). + # @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished + # using it. The server waits for a short period of time before + # determining the exchange is unused to give time to the client code + # to bind a queue to it. # - # TODO: document behavior when either the binding or the message is missing - # a header present in the other + # @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but + # only when bound to other exchanges. Internal exchanges are used to + # construct wiring that is not visible to applications. This is a RabbitMQ-specific + # extension. # - # TODO: insert example + # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. # - # == Options - # * :passive => true | false (default false) - # If set, the server will not create the exchange if it does not - # already exist. The client can use this to check whether an exchange - # exists without modifying the server state. # - # * :durable => true | false (default false) - # If set when creating a new exchange, the exchange will be marked as - # durable. Durable exchanges remain active when a server restarts. - # Non-durable exchanges (transient exchanges) are purged if/when a - # server restarts. + # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration. + # @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist. # - # A transient exchange (the default) is stored in memory-only. The - # exchange and all bindings will be lost on a server restart. - # It makes no sense to publish a persistent message to a transient - # exchange. # - # Durable exchanges and their bindings are recreated upon a server - # restart. Any published messages not routed to a bound queue are lost. + # @example Using fanout exchange to deliver messages to multiple consumers # - # * :auto_delete => true | false (default false) - # If set, the exchange is deleted when all queues have finished - # using it. The server waits for a short period of time before - # determining the exchange is unused to give time to the client code - # to bind a queue to it. + # # TODO # - # If the exchange has been previously declared, this option is ignored - # on subsequent declarations. # - # * :internal => true | false (default false) - # If set, the exchange may not be used directly by publishers, but - # only when bound to other exchanges. Internal exchanges are used to - # construct wiring that is not visible to applications. + # @see Exchange + # @see Exchange#initialize + # @see Channel#default_exchange + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3) # - # * :nowait => true | false (default true) - # If set, the server will not respond to the method. The client should - # not wait for a reply method. If the server could not complete the - # method it will raise a channel or connection exception. - # - # == Exceptions - # Doing any of these activities are illegal and will raise AMQP::Error. - # * redeclare an already-declared exchange to a different type - # * :passive => true and the exchange does not exist (NOT_FOUND) - # * using a value other than "any" or "all" for "x-match" + # @return [Exchange] + # @api public def headers(name = 'amq.match', opts = {}, &block) - if exchange = self.exchanges.find { |exchange| exchange.name == name } + if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:headers, name, opts, block) validate_parameters_match!(exchange, extended_opts) exchange else - self.exchanges << Exchange.new(self, :headers, name, opts, &block) + register_exchange(Exchange.new(self, :headers, name, opts, &block)) end end - # Queues store and forward messages. Queues can be configured in the server - # or created at runtime. Queues must be attached to at least one exchange - # in order to receive messages from publishers. + + # Declares and returns a Queue instance associated with this channel. See {Queue Queue class documentation} for + # more information about queues. # - # Like an Exchange, queue names starting with 'amq.' are reserved for - # internal use. Attempts to create queue names in violation of this - # reservation will raise AMQP::Error (ACCESS_REFUSED). + # To make broker generate queue name for you (a classic example is exclusive + # queues that are only used for a short period of time), pass empty string + # as name value. Then queue will get it's name as soon as broker's response + # (queue.declare-ok) arrives. Note that in this case, block is required. # - # It is not supported to create a queue without a name; some string - # (even the empty string) must be passed in the +name+ parameter. # - # == Options - # * :passive => true | false (default false) - # If set, the server will not create the queue if it does not - # already exist. The client can use this to check whether the queue - # exists without modifying the server state. + # Like for exchanges, queue names starting with 'amq.' cannot be modified and + # should not be used by applications. # - # * :durable => true | false (default false) - # If set when creating a new queue, the queue will be marked as - # durable. Durable queues remain active when a server restarts. - # Non-durable queues (transient queues) are purged if/when a - # server restarts. Note that durable queues do not necessarily - # hold persistent messages, although it does not make sense to - # send persistent messages to a transient queue (though it is - # allowed). + # @example Declaring a queue in a mail delivery app using Channel#queue without a block + # AMQP.connect do |connection| + # AMQP::Channel.new(connection) do |ch| + # # message producers will be able to send messages to this queue + # # using direct exchange and routing key = "mail.delivery" + # queue = ch.queue("mail.delivery", :durable => true) + # queue.subscribe do |headers, payload| + # # ... + # end + # end + # end # - # Again, note the durability property on a queue has no influence on - # the persistence of published messages. A durable queue containing - # transient messages will flush those messages on a restart. + # @example Declaring a server-named exclusive queue that receives all messages related to events, using a block. + # AMQP.connect do |connection| + # AMQP::Channel.new(connection) do |ch| + # # message producers will be able to send messages to this queue + # # using amq.topic exchange with routing keys that begin with "events" + # ch.queue("", :exclusive => true) do |queue| + # queue.bind(ch.exchange("amq.topic"), :routing_key => "events.#").subscribe do |headers, payload| + # # ... + # end + # end + # end + # end # - # If the queue has already been declared, any redeclaration will - # ignore this setting. A queue may only be declared durable the - # first time when it is created. + # @param [String] name Queue name. If you want a server-named queue, you can omit the name (note that in this case, using block is mandatory). + # See {Queue Queue class documentation} for discussion of queue lifecycles and when use of server-named queues + # is optimal. # - # * :exclusive => true | false (default false) - # Exclusive queues may only be consumed from by the current connection. - # Setting the 'exclusive' flag always implies 'auto-delete'. Only a - # single consumer is allowed to remove messages from this queue. + # @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not + # already exist. The client can use this to check whether an exchange + # exists without modifying the server state. # - # The default is a shared queue. Multiple clients may consume messages - # from this queue. + # @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as + # durable. Durable exchanges and their bindings are recreated upon a server + # restart (information about them is persisted). Non-durable (transient) exchanges + # do not survive if/when a server restarts (information about them is stored exclusively + # in RAM). Any remaining messages in the queue will be purged when the queue + # is deleted regardless of the message's persistence setting. # - # Attempting to redeclare an already-declared queue as :exclusive => true - # will raise AMQP::Error. # - # * :auto_delete = true | false (default false) - # If set, the queue is deleted when all consumers have finished - # using it. Last consumer can be cancelled either explicitly or because - # its channel is closed. If there was no consumer ever on the queue, it - # won't be deleted. + # @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished + # using it. The server waits for a short period of time before + # determining the exchange is unused to give time to the client code + # to bind a queue to it. # - # The server waits for a short period of time before - # determining the queue is unused to give time to the client code - # to bind a queue to it. + # @option opts [Boolean] :exclusive (false) Exclusive queues may only be used by a single connection. + # Exclusivity also implies that queue is automatically deleted when connection + # is closed. Only one consumer is allowed to remove messages from exclusive queue. # - # If the queue has been previously declared, this option is ignored - # on subsequent declarations. + # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. # - # Any remaining messages in the queue will be purged when the queue - # is deleted regardless of the message's persistence setting. # - # * :nowait => true | false (default true) - # If set, the server will not respond to the method. The client should - # not wait for a reply method. If the server could not complete the - # method it will raise a channel or connection exception. + # @raise [AMQP::Error] Raised when queue is redeclared with parameters different from original declaration. + # @raise [AMQP::Error] Raised when queue is declared with :passive => true and the queue does not exist. + # @raise [AMQP::Error] Raised when queue is declared with :exclusive => true and queue with that name already exist. # - def queue(name, opts = {}, &block) - raise ArgumentError, "queue name must not be nil. Use '' (empty string) for server-named queues." if name.nil? - - if name && !name.empty? && (queue = self.queues.find { |queue| queue.name == name }) + # + # @yield [queue, declare_ok] Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional. + # @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used. + # @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance. + # + # @see Queue + # @see Queue#initialize + # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.4) + # + # @return [Queue] + # @api public + def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) + if name && !name.empty? && (queue = find_queue(name)) extended_opts = Queue.add_default_options(name, opts, block) validate_parameters_match!(queue, extended_opts) queue else - q = Queue.new(self, name, opts, &block) - self.queues << q + queue = if block.nil? + Queue.new(self, name, opts) + else + shim = Proc.new { |q, method| + queue = find_queue(method.queue) + if block.arity == 1 + block.call(queue) + else + block.call(queue, method.consumer_count, method.message_count) + end + } + Queue.new(self, name, opts, &shim) + end - q + register_queue(queue) end end + # Returns true if channel is not closed. + # @return [Boolean] + # @api public + def open? + self.status == :opened || self.status == :opening + end # open? + + def queue!(name, opts = {}, &block) - self.queues.add! Queue.new(self, name, opts, &block) + # TODO + raise NotImplementedError.new end - # Takes a channel, queue and optional object. + + # Instantiates and returns an RPC instance associated with this channel. # # The optional object may be a class name, module name or object # instance. When given a class or module name, the object is instantiated # during this setup. The passed queue is automatically subscribed to so # it passes all messages (and their arguments) to the object. @@ -664,319 +642,113 @@ # which does all of the heavy lifting with the proxy. Some client # elsewhere must call this method *with* the optional block so that # there is a valid destination. Failure to do so will just enqueue # marshalled messages that are never consumed. # - # EM.run do - # server = AMQP::Channel.new.rpc('hash table node', Hash) + # @example Use of RPC # - # client = AMQP::Channel.new.rpc('hash table node') - # client[:now] = Time.now - # client[:one] = 1 + # # TODO # - # client.values do |res| - # p 'client', :values => res - # end # - # client.keys do |res| - # p 'client', :keys => res - # EM.stop_event_loop - # end - # end - # + # @param [String, Queue] Queue to be used by RPC server. + # @return [RPC] + # @api public def rpc(name, obj = nil) - rpcs[name] ||= RPC.new(self, name, obj) + RPC.new(self, name, obj) end - def close(&block) - @on_close = block - if @deferred_status == :succeeded - send Protocol::Channel::Close.new(:reply_code => 200, - :reply_text => 'bye', - :method_id => 0, - :class_id => 0) - else - @closing = true - end - end - # Define a message and callback block to be executed on all - # errors. - def self.error msg = nil, &blk - if blk - @error_callback = blk - else - @error_callback.call(msg) if @error_callback and msg - end - end - - def prefetch(size) - @prefetch_size = size - - send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => size, :global => false) - - self - end - - # Asks the broker to redeliver all unacknowledged messages on this - # channel. + # Define a callback to be run on channel-level exception. # - # * requeue (default false) - # If this parameter is false, the message will be redelivered to the original recipient. - # If this flag is true, the server will attempt to requeue the message, potentially then - # delivering it to an alternative subscriber. + # @param [String] msg Error message # - def recover(requeue = false) - send Protocol::Basic::Recover.new(:requeue => requeue) - self + # @api public + def self.error(msg = nil, &block) + # TODO + raise NotImplementedError.new end - # Returns a hash of all the exchange proxy objects. + # @param [Fixnum] size + # @param [Boolean] global (false) # - # Not typically called by client code. - def exchanges - @exchanges ||= AMQP::Collection.new - end - - # Returns a hash of all the queue proxy objects. + # @return [Channel] self # - # Not typically called by client code. - def queues - @queues ||= AMQP::Collection.new - end + # @api public + def prefetch(size, global = false, &block) + # RabbitMQ as of 2.3.1 does not support prefetch_size. + self.qos(0, size, global, &block) - def get_queue - if block_given? - @get_queue_mutex.synchronize { - yield( @get_queue ||= [] ) - } - end + self end + + # Returns a hash of all rpc proxy objects. # - # Not typically called by client code. + # Most of the time, this method is not + # called by application code. + # @api plugin def rpcs - @rcps ||= {} + @rpcs.values end - # Queue objects keyed on their consumer tags. - # - # Not typically called by client code. - def consumers - @consumers ||= {} - end - def reset - @deferred_status = nil - @channel = nil - @queues_awaiting_declare_ok = Array.new - - initialize @connection - - @consumers = {} - - exs = @exchanges - @exchanges = AMQP::Collection.new - exs.each { |e| e.reset } if exs - - qus = @queues - @queues = AMQP::Collection.new - qus.each { |q| q.reset } if qus - - prefetch(@prefetch_size) if @prefetch_size - end - - # # Implementation # - # May raise a AMQP::Channel::Error exception when the frame payload contains a - # Protocol::Channel::Close object. + + # Resets channel state (for example, list of registered queue objects and so on). # - # This usually occurs when a client attempts to perform an illegal - # operation. A short, and incomplete, list of potential illegal operations - # follows: - # * publish a message to a deleted exchange (NOT_FOUND) - # * declare an exchange using the reserved 'amq.' naming structure (ACCESS_REFUSED) + # Most of the time, this method is not + # called by application code. # - def process_frame(frame) - log :received, frame + # @private + # @api plugin + def reset + # TODO + raise NotImplementedError.new + end - case frame - when Frame::Header - @header = frame.payload - @body = '' - check_content_completion + # @private + # @api private + def self.channel_id_mutex + @channel_id_mutex ||= Mutex.new + end - when Frame::Body - @body << frame.payload - check_content_completion + # @return [Fixnum] + # @private + # @api private + def self.next_channel_id + channel_id_mutex.synchronize do + @last_channel_id ||= 0 + @last_channel_id += 1 - when Frame::Method - handle_method(frame) + @last_channel_id end - end # process_frame + end + # @private + # @api plugin + def register_rpc(rpc) + raise ArgumentError, "argument is nil!" unless rpc - def send(*args) - conn.callback { |c| - @_send_mutex.synchronize do - args.each do |data| - unless self.closed? - data.ticket = @ticket if @ticket and data.respond_to? :ticket= - log :sending, data - c.send data, :channel => @channel - else - unless data.class == AMQP::Protocol::Channel::CloseOk - raise ChannelClosedError.new(self) - end - end - end - end - } - end # send + @rpcs[rpc.name] = rpc + end # register_rpc(rpc) + # @private + # @api plugin + def find_rpc(name) + @rpcs[name] + end - def check_content_completion - if @body.length >= @header.size - @header.properties.update(@method.arguments) - @consumer.receive @header, @body if @consumer - @body = @header = @consumer = @method = nil - end - end # check_content_completion protected - def handle_method(frame) - case method = frame.payload - when Protocol::Channel::OpenOk - send Protocol::Access::Request.new(:realm => '/data', - :read => true, - :write => true, - :active => true, - :passive => true) - - when Protocol::Access::RequestOk - @ticket = method.ticket - callback { - send Protocol::Channel::Close.new(:reply_code => 200, - :reply_text => 'bye', - :method_id => 0, - :class_id => 0) - } if @closing - succeed - - when Protocol::Basic::CancelOk - if @consumer = consumers[ method.consumer_tag ] - @consumer.cancelled - else - AMQP::Channel.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}" - end - - when Protocol::Exchange::DeclareOk - # We can't use exchanges[method.exchange] because if the name would - # be an empty string, then AMQP broker generated a random one. - exchanges = self.exchanges.select { |exchange| exchange.opts[:nowait].eql?(false) } - exchange = exchanges.reverse.find { |exchange| exchange.status.eql?(:unfinished) } - exchange.receive_response method - - when Protocol::Queue::DeclareOk - queue = @queues_awaiting_declare_ok.shift - - queue.receive_status method - when Protocol::Queue::BindOk - # We can't use queues[method.queue] because if the name would - # be an empty string, then AMQP broker generated a random one. - queues = self.queues.select { |queue| queue.sync_bind } - queue = queues.reverse.find { |queue| queue.status.eql?(:unbound) } - queue.after_bind method - - when Protocol::Basic::Deliver, Protocol::Basic::GetOk - @method = method - @header = nil - @body = '' - - if method.is_a? Protocol::Basic::GetOk - @consumer = get_queue { |q| q.shift } - AMQP::Channel.error "No pending Basic.GetOk requests" unless @consumer - else - @consumer = consumers[ method.consumer_tag ] - AMQP::Channel.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer - end - - when Protocol::Basic::GetEmpty - if @consumer = get_queue { |q| q.shift } - @consumer.receive nil, nil - else - AMQP::Channel.error "Basic.GetEmpty for invalid consumer" - end - - when Protocol::Channel::Close - @status = :closed - AMQP::Channel.error "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}" - - when Protocol::Channel::CloseOk - @status = :closed - @on_close && @on_close.call(self) - - @closing = false - conn.callback { |c| - c.channels.delete @channel - c.close if c.channels.empty? - } - - when Protocol::Basic::ConsumeOk - if @consumer = consumers[ method.consumer_tag ] - @consumer.confirm_subscribe - else - AMQP::Channel.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}" - end - when Protocol::Basic::Return - @method = method - end # case - end # handle_method(frame) - - - - private - - def log(*args) - return unless AMQP::logging - pp args - puts - end # log - def validate_parameters_match!(entity, parameters) unless entity.opts == parameters || parameters[:passive] raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters) end end # validate_parameters_match!(entity, parameters) end # Channel end # AMQP - - -MQ = AMQP::Channel - -# -# Backwards compatibility with 0.6.x -# - -class MQ - # unique identifier - def MQ.id - Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}" - end - - def MQ.default - # TODO: clear this when connection is closed - Thread.current[:mq] ||= MQ.new - end - - # Allows for calls to all MQ instance methods. This implicitly calls - # MQ.new so that a new channel is allocated for subsequent operations. - def MQ.method_missing(meth, *args, &blk) - MQ.default.__send__(meth, *args, &blk) - end -end