lib/mq.rb in amqp-0.6.7 vs lib/mq.rb in amqp-0.7.0.pre

- old
+ new

@@ -1,10 +1,11 @@ #:main: README # $:.unshift File.expand_path(File.dirname(File.expand_path(__FILE__))) require 'amqp' +require 'mq/collection' class MQ %w[ exchange queue rpc header ].each do |file| require "mq/#{file}" end @@ -30,11 +31,11 @@ # is used for 1 to many communications. Each consumer generates a queue to # receive messages and do some operation (in this case, print the time). # One consumer prints messages every second while the second consumer prints # messages every 2 seconds. After 5 seconds has elapsed, the 1 second # consumer is deleted. -# +# # Of interest is the relationship of EventMachine to the process. All MQ # operations must occur within the context of an EM.run block. We start # EventMachine in its own thread with an empty block; all subsequent calls # to the MQ API add their blocks to the EM.run block. This demonstrates how # the library could be used to build up and tear down communications outside @@ -42,77 +43,77 @@ # other synchronous operations. See the EventMachine documentation for # more information. # # require 'rubygems' # require 'mq' -# +# # thr = Thread.new { EM.run } -# +# # # turns on extreme logging # #AMQP.logging = true -# +# # def log *args # p args # end -# +# # def publisher # clock = MQ.fanout('clock') # EM.add_periodic_timer(1) do # puts -# +# # log :publishing, time = Time.now # clock.publish(Marshal.dump(time)) # end # end -# +# # def one_second_consumer # MQ.queue('every second').bind(MQ.fanout('clock')).subscribe do |time| # log 'every second', :received, Marshal.load(time) # end # end -# +# # def two_second_consumer # MQ.queue('every 2 seconds').bind('clock').subscribe do |time| # time = Marshal.load(time) # log 'every 2 seconds', :received, time if time.sec % 2 == 0 # end # end -# +# # def delete_one_second # EM.add_timer(5) do # # delete the 'every second' queue # log 'Deleting [every second] queue' # MQ.queue('every second').delete # end # end -# +# # publisher # one_second_consumer # two_second_consumer # delete_one_second # thr.join -# +# # __END__ -# +# # [:publishing, Tue Jan 06 22:46:14 -0600 2009] # ["every second", :received, Tue Jan 06 22:46:14 -0600 2009] # ["every 2 seconds", :received, Tue Jan 06 22:46:14 -0600 2009] -# +# # [:publishing, Tue Jan 06 22:46:16 -0600 2009] # ["every second", :received, Tue Jan 06 22:46:16 -0600 2009] # ["every 2 seconds", :received, Tue Jan 06 22:46:16 -0600 2009] -# +# # [:publishing, Tue Jan 06 22:46:17 -0600 2009] # ["every second", :received, Tue Jan 06 22:46:17 -0600 2009] -# +# # [:publishing, Tue Jan 06 22:46:18 -0600 2009] # ["every second", :received, Tue Jan 06 22:46:18 -0600 2009] # ["every 2 seconds", :received, Tue Jan 06 22:46:18 -0600 2009] # ["Deleting [every second] queue"] -# +# # [:publishing, Tue Jan 06 22:46:19 -0600 2009] -# +# # [:publishing, Tue Jan 06 22:46:20 -0600 2009] # ["every 2 seconds", :received, Tue Jan 06 22:46:20 -0600 2009] # class MQ include AMQP @@ -144,13 +145,21 @@ @channel = c.add_channel(self) send Protocol::Channel::Open.new } end attr_reader :channel, :connection - + + def check_content_completion + if @body.length >= @header.size + @header.properties.update(@method.arguments) + @consumer.receive @header, @body if @consumer + @body = @header = @consumer = @method = nil + end + end + # May raise a MQ::Error exception when the frame payload contains a - # Protocol::Channel::Close object. + # Protocol::Channel::Close object. # # This usually occurs when a client attempts to perform an illegal # operation. A short, and incomplete, list of potential illegal operations # follows: # * publish a message to a deleted exchange (NOT_FOUND) @@ -161,18 +170,15 @@ case frame when Frame::Header @header = frame.payload @body = '' + check_content_completion when Frame::Body @body << frame.payload - if @body.length >= @header.size - @header.properties.update(@method.arguments) - @consumer.receive @header, @body if @consumer - @body = @header = @consumer = @method = nil - end + check_content_completion when Frame::Method case method = frame.payload when Protocol::Channel::OpenOk send Protocol::Access::Request.new(:realm => '/data', @@ -196,13 +202,31 @@ @consumer.cancelled else MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}" end + when Protocol::Exchange::DeclareOk + # We can't use exchanges[method.exchange] because if the name would + # be an empty string, then AMQP broker generated a random one. + exchanges = self.exchanges.select { |exchange| exchange.opts[:nowait].eql?(false) } + exchange = exchanges.reverse.find { |exchange| exchange.status.eql?(:unfinished) } + exchange.receive_response method + when Protocol::Queue::DeclareOk - queues[ method.queue ].receive_status method + # We can't use queues[method.queue] because if the name would + # be an empty string, then AMQP broker generated a random one. + queues = self.queues.select { |queue| queue.opts[:nowait].eql?(false) } + queue = queues.reverse.find { |queue| queue.status.eql?(:unfinished) } + queue.receive_status method + when Protocol::Queue::BindOk + # We can't use queues[method.queue] because if the name would + # be an empty string, then AMQP broker generated a random one. + queues = self.queues.select { |queue| queue.opts[:nowait].eql?(false) } + queue = queues.reverse.find { |queue| queue.status.eql?(:unbound) } + queue.after_bind method + when Protocol::Basic::Deliver, Protocol::Basic::GetOk @method = method @header = nil @body = '' @@ -223,10 +247,12 @@ when Protocol::Channel::Close raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}" when Protocol::Channel::CloseOk + @on_close && @on_close.call(self) + @closing = false conn.callback{ |c| c.channels.delete @channel c.close if c.channels.empty? } @@ -257,12 +283,12 @@ # point for all published messages. # # == Direct # A direct exchange is useful for 1:1 communication between a publisher and # subscriber. Messages are routed to the queue with a binding that shares - # the same name as the exchange. Alternately, the messages are routed to - # the bound queue that shares the same name as the routing key used for + # the same name as the exchange. Alternately, the messages are routed to + # the bound queue that shares the same name as the routing key used for # defining the exchange. This exchange type does not honor the +:key+ option # when defining a new instance with a name. It _will_ honor the +:key+ option # if the exchange name is the empty string. # Allocating this exchange without a name _or_ with the empty string # will use the internal 'amq.direct' exchange. @@ -285,23 +311,23 @@ # == Options # * :passive => true | false (default false) # If set, the server will not create the exchange if it does not # already exist. The client can use this to check whether an exchange # exists without modifying the server state. - # + # # * :durable => true | false (default false) # If set when creating a new exchange, the exchange will be marked as # durable. Durable exchanges remain active when a server restarts. # Non-durable exchanges (transient exchanges) are purged if/when a - # server restarts. + # server restarts. # # A transient exchange (the default) is stored in memory-only. The # exchange and all bindings will be lost on a server restart. # It makes no sense to publish a persistent message to a transient # exchange. # - # Durable exchanges and their bindings are recreated upon a server + # Durable exchanges and their bindings are recreated upon a server # restart. Any published messages not routed to a bound queue are lost. # # * :auto_delete => true | false (default false) # If set, the exchange is deleted when all queues have finished # using it. The server waits for a short period of time before @@ -324,29 +350,29 @@ # == Exceptions # Doing any of these activities are illegal and will raise MQ:Error. # * redeclare an already-declared exchange to a different type # * :passive => true and the exchange does not exist (NOT_FOUND) # - def direct name = 'amq.direct', opts = {} - exchanges[name] ||= Exchange.new(self, :direct, name, opts) + def direct name = 'amq.direct', opts = {}, &block + self.exchanges << Exchange.new(self, :direct, name, opts, &block) end # Defines, intializes and returns an Exchange to act as an ingress # point for all published messages. # # == Fanout - # A fanout exchange is useful for 1:N communication where one publisher - # feeds multiple subscribers. Like direct exchanges, messages published - # to a fanout exchange are delivered to queues whose name matches the - # exchange name (or are bound to that exchange name). Each queue gets + # A fanout exchange is useful for 1:N communication where one publisher + # feeds multiple subscribers. Like direct exchanges, messages published + # to a fanout exchange are delivered to queues whose name matches the + # exchange name (or are bound to that exchange name). Each queue gets # its own copy of the message. # # Any published message, regardless of its persistence setting, is thrown # away by the exchange when there are no queues bound to it. # - # Like the direct exchange type, this exchange type does not honor the - # +:key+ option when defining a new instance with a name. It _will_ honor + # Like the direct exchange type, this exchange type does not honor the + # +:key+ option when defining a new instance with a name. It _will_ honor # the +:key+ option if the exchange name is the empty string. # Allocating this exchange without a name _or_ with the empty string # will use the internal 'amq.fanout' exchange. # # EM.run do @@ -371,23 +397,23 @@ # == Options # * :passive => true | false (default false) # If set, the server will not create the exchange if it does not # already exist. The client can use this to check whether an exchange # exists without modifying the server state. - # + # # * :durable => true | false (default false) # If set when creating a new exchange, the exchange will be marked as # durable. Durable exchanges remain active when a server restarts. # Non-durable exchanges (transient exchanges) are purged if/when a - # server restarts. + # server restarts. # # A transient exchange (the default) is stored in memory-only. The # exchange and all bindings will be lost on a server restart. # It makes no sense to publish a persistent message to a transient # exchange. # - # Durable exchanges and their bindings are recreated upon a server + # Durable exchanges and their bindings are recreated upon a server # restart. Any published messages not routed to a bound queue are lost. # # * :auto_delete => true | false (default false) # If set, the exchange is deleted when all queues have finished # using it. The server waits for a short period of time before @@ -410,38 +436,38 @@ # == Exceptions # Doing any of these activities are illegal and will raise MQ:Error. # * redeclare an already-declared exchange to a different type # * :passive => true and the exchange does not exist (NOT_FOUND) # - def fanout name = 'amq.fanout', opts = {} - exchanges[name] ||= Exchange.new(self, :fanout, name, opts) + def fanout name = 'amq.fanout', opts = {}, &block + self.exchanges << Exchange.new(self, :fanout, name, opts, &block) end # Defines, intializes and returns an Exchange to act as an ingress # point for all published messages. # # == Topic - # A topic exchange allows for messages to be published to an exchange + # A topic exchange allows for messages to be published to an exchange # tagged with a specific routing key. The Exchange uses the routing key - # to determine which queues to deliver the message. Wildcard matching - # is allowed. The topic must be declared using dot notation to separate + # to determine which queues to deliver the message. Wildcard matching + # is allowed. The topic must be declared using dot notation to separate # each subtopic. # # This is the only exchange type to honor the +key+ hash key for all # cases. # # Any published message, regardless of its persistence setting, is thrown # away by the exchange when there are no queues bound to it. # - # As part of the AMQP standard, each server _should_ predeclare a topic + # As part of the AMQP standard, each server _should_ predeclare a topic # exchange called 'amq.topic' (this is not required by the standard). # Allocating this exchange without a name _or_ with the empty string # will use the internal 'amq.topic' exchange. # # The classic example is delivering market data. When publishing market - # data for stocks, we may subdivide the stream based on 2 - # characteristics: nation code and trading symbol. The topic tree for + # data for stocks, we may subdivide the stream based on 2 + # characteristics: nation code and trading symbol. The topic tree for # Apple Computer would look like: # 'stock.us.aapl' # For a foreign stock, it may look like: # 'stock.de.dax' # @@ -472,34 +498,34 @@ # MQ.queue('only dax').bind(exch, :key => 'stock.de.dax').subscribe do |price| # puts "dax price [#{price}]" # end # end # - # For matching, the '*' (asterisk) wildcard matches against one - # dot-separated item only. The '#' wildcard (hash or pound symbol) - # matches against 0 or more dot-separated items. If none of these - # symbols are used, the exchange performs a comparison looking for an + # For matching, the '*' (asterisk) wildcard matches against one + # dot-separated item only. The '#' wildcard (hash or pound symbol) + # matches against 0 or more dot-separated items. If none of these + # symbols are used, the exchange performs a comparison looking for an # exact match. # # == Options # * :passive => true | false (default false) # If set, the server will not create the exchange if it does not # already exist. The client can use this to check whether an exchange # exists without modifying the server state. - # + # # * :durable => true | false (default false) # If set when creating a new exchange, the exchange will be marked as # durable. Durable exchanges remain active when a server restarts. # Non-durable exchanges (transient exchanges) are purged if/when a - # server restarts. + # server restarts. # # A transient exchange (the default) is stored in memory-only. The # exchange and all bindings will be lost on a server restart. # It makes no sense to publish a persistent message to a transient # exchange. # - # Durable exchanges and their bindings are recreated upon a server + # Durable exchanges and their bindings are recreated upon a server # restart. Any published messages not routed to a bound queue are lost. # # * :auto_delete => true | false (default false) # If set, the exchange is deleted when all queues have finished # using it. The server waits for a short period of time before @@ -522,39 +548,39 @@ # == Exceptions # Doing any of these activities are illegal and will raise MQ:Error. # * redeclare an already-declared exchange to a different type # * :passive => true and the exchange does not exist (NOT_FOUND) # - def topic name = 'amq.topic', opts = {} - exchanges[name] ||= Exchange.new(self, :topic, name, opts) + def topic name = 'amq.topic', opts = {}, &block + self.exchanges << Exchange.new(self, :topic, name, opts, &block) end # Defines, intializes and returns an Exchange to act as an ingress # point for all published messages. # # == Headers - # A headers exchange allows for messages to be published to an exchange + # A headers exchange allows for messages to be published to an exchange # # Any published message, regardless of its persistence setting, is thrown # away by the exchange when there are no queues bound to it. # - # As part of the AMQP standard, each server _should_ predeclare a headers + # As part of the AMQP standard, each server _should_ predeclare a headers # exchange called 'amq.match' (this is not required by the standard). # Allocating this exchange without a name _or_ with the empty string # will use the internal 'amq.match' exchange. # - # TODO: The classic example is ... + # TODO: The classic example is ... # # When publishing data to the exchange, bound queues subscribing to the # exchange indicate which data interests them by passing arguments # for matching against the headers in published messages. The # form of the matching can be controlled by the 'x-match' argument, which # may be 'any' or 'all'. If unspecified (in RabbitMQ at least), it defaults # to "all". # - # A value of 'all' for 'x-match' implies that all values must match (i.e. - # it does an AND of the headers ), while a value of 'any' implies that + # A value of 'all' for 'x-match' implies that all values must match (i.e. + # it does an AND of the headers ), while a value of 'any' implies that # at least one should match (ie. it does an OR). # # TODO: document behavior when either the binding or the message is missing # a header present in the other # @@ -563,23 +589,23 @@ # == Options # * :passive => true | false (default false) # If set, the server will not create the exchange if it does not # already exist. The client can use this to check whether an exchange # exists without modifying the server state. - # + # # * :durable => true | false (default false) # If set when creating a new exchange, the exchange will be marked as # durable. Durable exchanges remain active when a server restarts. # Non-durable exchanges (transient exchanges) are purged if/when a - # server restarts. + # server restarts. # # A transient exchange (the default) is stored in memory-only. The # exchange and all bindings will be lost on a server restart. # It makes no sense to publish a persistent message to a transient # exchange. # - # Durable exchanges and their bindings are recreated upon a server + # Durable exchanges and their bindings are recreated upon a server # restart. Any published messages not routed to a bound queue are lost. # # * :auto_delete => true | false (default false) # If set, the exchange is deleted when all queues have finished # using it. The server waits for a short period of time before @@ -602,12 +628,12 @@ # == Exceptions # Doing any of these activities are illegal and will raise MQ:Error. # * redeclare an already-declared exchange to a different type # * :passive => true and the exchange does not exist (NOT_FOUND) # * using a value other than "any" or "all" for "x-match" - def headers name = 'amq.match', opts = {} - exchanges[name] ||= Exchange.new(self, :headers, name, opts) + def headers name = 'amq.match', opts = {}, &block + self.exchanges << Exchange.new(self, :headers, name, opts, &block) end # Queues store and forward messages. Queues can be configured in the server # or created at runtime. Queues must be attached to at least one exchange # in order to receive messages from publishers. @@ -622,11 +648,11 @@ # == Options # * :passive => true | false (default false) # If set, the server will not create the exchange if it does not # already exist. The client can use this to check whether an exchange # exists without modifying the server state. - # + # # * :durable => true | false (default false) # If set when creating a new queue, the queue will be marked as # durable. Durable queues remain active when a server restarts. # Non-durable queues (transient queues) are purged if/when a # server restarts. Note that durable queues do not necessarily @@ -655,11 +681,11 @@ # # * :auto_delete = true | false (default false) # If set, the queue is deleted when all consumers have finished # using it. Last consumer can be cancelled either explicitly or because # its channel is closed. If there was no consumer ever on the queue, it - # won't be deleted. + # won't be deleted. # # The server waits for a short period of time before # determining the queue is unused to give time to the client code # to bind an exchange to it. # @@ -672,31 +698,35 @@ # * :nowait => true | false (default true) # If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # - def queue name, opts = {} - queues[name] ||= Queue.new(self, name, opts) + def queue name, opts = {}, &block + self.queues << Queue.new(self, name, opts, &block) end + def queue! name, opts = {}, &block + self.queues.add! Queue.new(self, name, opts, &block) + end + # Takes a channel, queue and optional object. # # The optional object may be a class name, module name or object # instance. When given a class or module name, the object is instantiated # during this setup. The passed queue is automatically subscribed to so # it passes all messages (and their arguments) to the object. # # Marshalling and unmarshalling the objects is handled internally. This # marshalling is subject to the same restrictions as defined in the - # Marshal[http://ruby-doc.org/core/classes/Marshal.html] standard + # Marshal[http://ruby-doc.org/core/classes/Marshal.html] standard # library. See that documentation for further reference. # - # When the optional object is not passed, the returned rpc reference is - # used to send messages and arguments to the queue. See #method_missing - # which does all of the heavy lifting with the proxy. Some client - # elsewhere must call this method *with* the optional block so that - # there is a valid destination. Failure to do so will just enqueue + # When the optional object is not passed, the returned rpc reference is + # used to send messages and arguments to the queue. See #method_missing + # which does all of the heavy lifting with the proxy. Some client + # elsewhere must call this method *with* the optional block so that + # there is a valid destination. Failure to do so will just enqueue # marshalled messages that are never consumed. # # EM.run do # server = MQ.rpc('hash table node', Hash) # @@ -716,11 +746,12 @@ # def rpc name, obj = nil rpcs[name] ||= RPC.new(self, name, obj) end - def close + def close(&block) + @on_close = block if @deferred_status == :succeeded send Protocol::Channel::Close.new(:reply_code => 200, :reply_text => 'bye', :method_id => 0, :class_id => 0) @@ -760,18 +791,18 @@ # Returns a hash of all the exchange proxy objects. # # Not typically called by client code. def exchanges - @exchanges ||= {} + @exchanges ||= MQ::Collection.new end # Returns a hash of all the queue proxy objects. # # Not typically called by client code. def queues - @queues ||= {} + @queues ||= MQ::Collection.new end def get_queue if block_given? (@get_queue_mutex ||= Mutex.new).synchronize{ @@ -842,6 +873,6 @@ class MQ # unique identifier def MQ.id Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}" end -end \ No newline at end of file +end