lib/mq.rb in amqp-0.6.7 vs lib/mq.rb in amqp-0.7.0.pre
- old
+ new
@@ -1,10 +1,11 @@
#:main: README
#
$:.unshift File.expand_path(File.dirname(File.expand_path(__FILE__)))
require 'amqp'
+require 'mq/collection'
class MQ
%w[ exchange queue rpc header ].each do |file|
require "mq/#{file}"
end
@@ -30,11 +31,11 @@
# is used for 1 to many communications. Each consumer generates a queue to
# receive messages and do some operation (in this case, print the time).
# One consumer prints messages every second while the second consumer prints
# messages every 2 seconds. After 5 seconds has elapsed, the 1 second
# consumer is deleted.
-#
+#
# Of interest is the relationship of EventMachine to the process. All MQ
# operations must occur within the context of an EM.run block. We start
# EventMachine in its own thread with an empty block; all subsequent calls
# to the MQ API add their blocks to the EM.run block. This demonstrates how
# the library could be used to build up and tear down communications outside
@@ -42,77 +43,77 @@
# other synchronous operations. See the EventMachine documentation for
# more information.
#
# require 'rubygems'
# require 'mq'
-#
+#
# thr = Thread.new { EM.run }
-#
+#
# # turns on extreme logging
# #AMQP.logging = true
-#
+#
# def log *args
# p args
# end
-#
+#
# def publisher
# clock = MQ.fanout('clock')
# EM.add_periodic_timer(1) do
# puts
-#
+#
# log :publishing, time = Time.now
# clock.publish(Marshal.dump(time))
# end
# end
-#
+#
# def one_second_consumer
# MQ.queue('every second').bind(MQ.fanout('clock')).subscribe do |time|
# log 'every second', :received, Marshal.load(time)
# end
# end
-#
+#
# def two_second_consumer
# MQ.queue('every 2 seconds').bind('clock').subscribe do |time|
# time = Marshal.load(time)
# log 'every 2 seconds', :received, time if time.sec % 2 == 0
# end
# end
-#
+#
# def delete_one_second
# EM.add_timer(5) do
# # delete the 'every second' queue
# log 'Deleting [every second] queue'
# MQ.queue('every second').delete
# end
# end
-#
+#
# publisher
# one_second_consumer
# two_second_consumer
# delete_one_second
# thr.join
-#
+#
# __END__
-#
+#
# [:publishing, Tue Jan 06 22:46:14 -0600 2009]
# ["every second", :received, Tue Jan 06 22:46:14 -0600 2009]
# ["every 2 seconds", :received, Tue Jan 06 22:46:14 -0600 2009]
-#
+#
# [:publishing, Tue Jan 06 22:46:16 -0600 2009]
# ["every second", :received, Tue Jan 06 22:46:16 -0600 2009]
# ["every 2 seconds", :received, Tue Jan 06 22:46:16 -0600 2009]
-#
+#
# [:publishing, Tue Jan 06 22:46:17 -0600 2009]
# ["every second", :received, Tue Jan 06 22:46:17 -0600 2009]
-#
+#
# [:publishing, Tue Jan 06 22:46:18 -0600 2009]
# ["every second", :received, Tue Jan 06 22:46:18 -0600 2009]
# ["every 2 seconds", :received, Tue Jan 06 22:46:18 -0600 2009]
# ["Deleting [every second] queue"]
-#
+#
# [:publishing, Tue Jan 06 22:46:19 -0600 2009]
-#
+#
# [:publishing, Tue Jan 06 22:46:20 -0600 2009]
# ["every 2 seconds", :received, Tue Jan 06 22:46:20 -0600 2009]
#
class MQ
include AMQP
@@ -144,13 +145,21 @@
@channel = c.add_channel(self)
send Protocol::Channel::Open.new
}
end
attr_reader :channel, :connection
-
+
+ def check_content_completion
+ if @body.length >= @header.size
+ @header.properties.update(@method.arguments)
+ @consumer.receive @header, @body if @consumer
+ @body = @header = @consumer = @method = nil
+ end
+ end
+
# May raise a MQ::Error exception when the frame payload contains a
- # Protocol::Channel::Close object.
+ # Protocol::Channel::Close object.
#
# This usually occurs when a client attempts to perform an illegal
# operation. A short, and incomplete, list of potential illegal operations
# follows:
# * publish a message to a deleted exchange (NOT_FOUND)
@@ -161,18 +170,15 @@
case frame
when Frame::Header
@header = frame.payload
@body = ''
+ check_content_completion
when Frame::Body
@body << frame.payload
- if @body.length >= @header.size
- @header.properties.update(@method.arguments)
- @consumer.receive @header, @body if @consumer
- @body = @header = @consumer = @method = nil
- end
+ check_content_completion
when Frame::Method
case method = frame.payload
when Protocol::Channel::OpenOk
send Protocol::Access::Request.new(:realm => '/data',
@@ -196,13 +202,31 @@
@consumer.cancelled
else
MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}"
end
+ when Protocol::Exchange::DeclareOk
+ # We can't use exchanges[method.exchange] because if the name would
+ # be an empty string, then AMQP broker generated a random one.
+ exchanges = self.exchanges.select { |exchange| exchange.opts[:nowait].eql?(false) }
+ exchange = exchanges.reverse.find { |exchange| exchange.status.eql?(:unfinished) }
+ exchange.receive_response method
+
when Protocol::Queue::DeclareOk
- queues[ method.queue ].receive_status method
+ # We can't use queues[method.queue] because if the name would
+ # be an empty string, then AMQP broker generated a random one.
+ queues = self.queues.select { |queue| queue.opts[:nowait].eql?(false) }
+ queue = queues.reverse.find { |queue| queue.status.eql?(:unfinished) }
+ queue.receive_status method
+ when Protocol::Queue::BindOk
+ # We can't use queues[method.queue] because if the name would
+ # be an empty string, then AMQP broker generated a random one.
+ queues = self.queues.select { |queue| queue.opts[:nowait].eql?(false) }
+ queue = queues.reverse.find { |queue| queue.status.eql?(:unbound) }
+ queue.after_bind method
+
when Protocol::Basic::Deliver, Protocol::Basic::GetOk
@method = method
@header = nil
@body = ''
@@ -223,10 +247,12 @@
when Protocol::Channel::Close
raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}"
when Protocol::Channel::CloseOk
+ @on_close && @on_close.call(self)
+
@closing = false
conn.callback{ |c|
c.channels.delete @channel
c.close if c.channels.empty?
}
@@ -257,12 +283,12 @@
# point for all published messages.
#
# == Direct
# A direct exchange is useful for 1:1 communication between a publisher and
# subscriber. Messages are routed to the queue with a binding that shares
- # the same name as the exchange. Alternately, the messages are routed to
- # the bound queue that shares the same name as the routing key used for
+ # the same name as the exchange. Alternately, the messages are routed to
+ # the bound queue that shares the same name as the routing key used for
# defining the exchange. This exchange type does not honor the +:key+ option
# when defining a new instance with a name. It _will_ honor the +:key+ option
# if the exchange name is the empty string.
# Allocating this exchange without a name _or_ with the empty string
# will use the internal 'amq.direct' exchange.
@@ -285,23 +311,23 @@
# == Options
# * :passive => true | false (default false)
# If set, the server will not create the exchange if it does not
# already exist. The client can use this to check whether an exchange
# exists without modifying the server state.
- #
+ #
# * :durable => true | false (default false)
# If set when creating a new exchange, the exchange will be marked as
# durable. Durable exchanges remain active when a server restarts.
# Non-durable exchanges (transient exchanges) are purged if/when a
- # server restarts.
+ # server restarts.
#
# A transient exchange (the default) is stored in memory-only. The
# exchange and all bindings will be lost on a server restart.
# It makes no sense to publish a persistent message to a transient
# exchange.
#
- # Durable exchanges and their bindings are recreated upon a server
+ # Durable exchanges and their bindings are recreated upon a server
# restart. Any published messages not routed to a bound queue are lost.
#
# * :auto_delete => true | false (default false)
# If set, the exchange is deleted when all queues have finished
# using it. The server waits for a short period of time before
@@ -324,29 +350,29 @@
# == Exceptions
# Doing any of these activities are illegal and will raise MQ:Error.
# * redeclare an already-declared exchange to a different type
# * :passive => true and the exchange does not exist (NOT_FOUND)
#
- def direct name = 'amq.direct', opts = {}
- exchanges[name] ||= Exchange.new(self, :direct, name, opts)
+ def direct name = 'amq.direct', opts = {}, &block
+ self.exchanges << Exchange.new(self, :direct, name, opts, &block)
end
# Defines, intializes and returns an Exchange to act as an ingress
# point for all published messages.
#
# == Fanout
- # A fanout exchange is useful for 1:N communication where one publisher
- # feeds multiple subscribers. Like direct exchanges, messages published
- # to a fanout exchange are delivered to queues whose name matches the
- # exchange name (or are bound to that exchange name). Each queue gets
+ # A fanout exchange is useful for 1:N communication where one publisher
+ # feeds multiple subscribers. Like direct exchanges, messages published
+ # to a fanout exchange are delivered to queues whose name matches the
+ # exchange name (or are bound to that exchange name). Each queue gets
# its own copy of the message.
#
# Any published message, regardless of its persistence setting, is thrown
# away by the exchange when there are no queues bound to it.
#
- # Like the direct exchange type, this exchange type does not honor the
- # +:key+ option when defining a new instance with a name. It _will_ honor
+ # Like the direct exchange type, this exchange type does not honor the
+ # +:key+ option when defining a new instance with a name. It _will_ honor
# the +:key+ option if the exchange name is the empty string.
# Allocating this exchange without a name _or_ with the empty string
# will use the internal 'amq.fanout' exchange.
#
# EM.run do
@@ -371,23 +397,23 @@
# == Options
# * :passive => true | false (default false)
# If set, the server will not create the exchange if it does not
# already exist. The client can use this to check whether an exchange
# exists without modifying the server state.
- #
+ #
# * :durable => true | false (default false)
# If set when creating a new exchange, the exchange will be marked as
# durable. Durable exchanges remain active when a server restarts.
# Non-durable exchanges (transient exchanges) are purged if/when a
- # server restarts.
+ # server restarts.
#
# A transient exchange (the default) is stored in memory-only. The
# exchange and all bindings will be lost on a server restart.
# It makes no sense to publish a persistent message to a transient
# exchange.
#
- # Durable exchanges and their bindings are recreated upon a server
+ # Durable exchanges and their bindings are recreated upon a server
# restart. Any published messages not routed to a bound queue are lost.
#
# * :auto_delete => true | false (default false)
# If set, the exchange is deleted when all queues have finished
# using it. The server waits for a short period of time before
@@ -410,38 +436,38 @@
# == Exceptions
# Doing any of these activities are illegal and will raise MQ:Error.
# * redeclare an already-declared exchange to a different type
# * :passive => true and the exchange does not exist (NOT_FOUND)
#
- def fanout name = 'amq.fanout', opts = {}
- exchanges[name] ||= Exchange.new(self, :fanout, name, opts)
+ def fanout name = 'amq.fanout', opts = {}, &block
+ self.exchanges << Exchange.new(self, :fanout, name, opts, &block)
end
# Defines, intializes and returns an Exchange to act as an ingress
# point for all published messages.
#
# == Topic
- # A topic exchange allows for messages to be published to an exchange
+ # A topic exchange allows for messages to be published to an exchange
# tagged with a specific routing key. The Exchange uses the routing key
- # to determine which queues to deliver the message. Wildcard matching
- # is allowed. The topic must be declared using dot notation to separate
+ # to determine which queues to deliver the message. Wildcard matching
+ # is allowed. The topic must be declared using dot notation to separate
# each subtopic.
#
# This is the only exchange type to honor the +key+ hash key for all
# cases.
#
# Any published message, regardless of its persistence setting, is thrown
# away by the exchange when there are no queues bound to it.
#
- # As part of the AMQP standard, each server _should_ predeclare a topic
+ # As part of the AMQP standard, each server _should_ predeclare a topic
# exchange called 'amq.topic' (this is not required by the standard).
# Allocating this exchange without a name _or_ with the empty string
# will use the internal 'amq.topic' exchange.
#
# The classic example is delivering market data. When publishing market
- # data for stocks, we may subdivide the stream based on 2
- # characteristics: nation code and trading symbol. The topic tree for
+ # data for stocks, we may subdivide the stream based on 2
+ # characteristics: nation code and trading symbol. The topic tree for
# Apple Computer would look like:
# 'stock.us.aapl'
# For a foreign stock, it may look like:
# 'stock.de.dax'
#
@@ -472,34 +498,34 @@
# MQ.queue('only dax').bind(exch, :key => 'stock.de.dax').subscribe do |price|
# puts "dax price [#{price}]"
# end
# end
#
- # For matching, the '*' (asterisk) wildcard matches against one
- # dot-separated item only. The '#' wildcard (hash or pound symbol)
- # matches against 0 or more dot-separated items. If none of these
- # symbols are used, the exchange performs a comparison looking for an
+ # For matching, the '*' (asterisk) wildcard matches against one
+ # dot-separated item only. The '#' wildcard (hash or pound symbol)
+ # matches against 0 or more dot-separated items. If none of these
+ # symbols are used, the exchange performs a comparison looking for an
# exact match.
#
# == Options
# * :passive => true | false (default false)
# If set, the server will not create the exchange if it does not
# already exist. The client can use this to check whether an exchange
# exists without modifying the server state.
- #
+ #
# * :durable => true | false (default false)
# If set when creating a new exchange, the exchange will be marked as
# durable. Durable exchanges remain active when a server restarts.
# Non-durable exchanges (transient exchanges) are purged if/when a
- # server restarts.
+ # server restarts.
#
# A transient exchange (the default) is stored in memory-only. The
# exchange and all bindings will be lost on a server restart.
# It makes no sense to publish a persistent message to a transient
# exchange.
#
- # Durable exchanges and their bindings are recreated upon a server
+ # Durable exchanges and their bindings are recreated upon a server
# restart. Any published messages not routed to a bound queue are lost.
#
# * :auto_delete => true | false (default false)
# If set, the exchange is deleted when all queues have finished
# using it. The server waits for a short period of time before
@@ -522,39 +548,39 @@
# == Exceptions
# Doing any of these activities are illegal and will raise MQ:Error.
# * redeclare an already-declared exchange to a different type
# * :passive => true and the exchange does not exist (NOT_FOUND)
#
- def topic name = 'amq.topic', opts = {}
- exchanges[name] ||= Exchange.new(self, :topic, name, opts)
+ def topic name = 'amq.topic', opts = {}, &block
+ self.exchanges << Exchange.new(self, :topic, name, opts, &block)
end
# Defines, intializes and returns an Exchange to act as an ingress
# point for all published messages.
#
# == Headers
- # A headers exchange allows for messages to be published to an exchange
+ # A headers exchange allows for messages to be published to an exchange
#
# Any published message, regardless of its persistence setting, is thrown
# away by the exchange when there are no queues bound to it.
#
- # As part of the AMQP standard, each server _should_ predeclare a headers
+ # As part of the AMQP standard, each server _should_ predeclare a headers
# exchange called 'amq.match' (this is not required by the standard).
# Allocating this exchange without a name _or_ with the empty string
# will use the internal 'amq.match' exchange.
#
- # TODO: The classic example is ...
+ # TODO: The classic example is ...
#
# When publishing data to the exchange, bound queues subscribing to the
# exchange indicate which data interests them by passing arguments
# for matching against the headers in published messages. The
# form of the matching can be controlled by the 'x-match' argument, which
# may be 'any' or 'all'. If unspecified (in RabbitMQ at least), it defaults
# to "all".
#
- # A value of 'all' for 'x-match' implies that all values must match (i.e.
- # it does an AND of the headers ), while a value of 'any' implies that
+ # A value of 'all' for 'x-match' implies that all values must match (i.e.
+ # it does an AND of the headers ), while a value of 'any' implies that
# at least one should match (ie. it does an OR).
#
# TODO: document behavior when either the binding or the message is missing
# a header present in the other
#
@@ -563,23 +589,23 @@
# == Options
# * :passive => true | false (default false)
# If set, the server will not create the exchange if it does not
# already exist. The client can use this to check whether an exchange
# exists without modifying the server state.
- #
+ #
# * :durable => true | false (default false)
# If set when creating a new exchange, the exchange will be marked as
# durable. Durable exchanges remain active when a server restarts.
# Non-durable exchanges (transient exchanges) are purged if/when a
- # server restarts.
+ # server restarts.
#
# A transient exchange (the default) is stored in memory-only. The
# exchange and all bindings will be lost on a server restart.
# It makes no sense to publish a persistent message to a transient
# exchange.
#
- # Durable exchanges and their bindings are recreated upon a server
+ # Durable exchanges and their bindings are recreated upon a server
# restart. Any published messages not routed to a bound queue are lost.
#
# * :auto_delete => true | false (default false)
# If set, the exchange is deleted when all queues have finished
# using it. The server waits for a short period of time before
@@ -602,12 +628,12 @@
# == Exceptions
# Doing any of these activities are illegal and will raise MQ:Error.
# * redeclare an already-declared exchange to a different type
# * :passive => true and the exchange does not exist (NOT_FOUND)
# * using a value other than "any" or "all" for "x-match"
- def headers name = 'amq.match', opts = {}
- exchanges[name] ||= Exchange.new(self, :headers, name, opts)
+ def headers name = 'amq.match', opts = {}, &block
+ self.exchanges << Exchange.new(self, :headers, name, opts, &block)
end
# Queues store and forward messages. Queues can be configured in the server
# or created at runtime. Queues must be attached to at least one exchange
# in order to receive messages from publishers.
@@ -622,11 +648,11 @@
# == Options
# * :passive => true | false (default false)
# If set, the server will not create the exchange if it does not
# already exist. The client can use this to check whether an exchange
# exists without modifying the server state.
- #
+ #
# * :durable => true | false (default false)
# If set when creating a new queue, the queue will be marked as
# durable. Durable queues remain active when a server restarts.
# Non-durable queues (transient queues) are purged if/when a
# server restarts. Note that durable queues do not necessarily
@@ -655,11 +681,11 @@
#
# * :auto_delete = true | false (default false)
# If set, the queue is deleted when all consumers have finished
# using it. Last consumer can be cancelled either explicitly or because
# its channel is closed. If there was no consumer ever on the queue, it
- # won't be deleted.
+ # won't be deleted.
#
# The server waits for a short period of time before
# determining the queue is unused to give time to the client code
# to bind an exchange to it.
#
@@ -672,31 +698,35 @@
# * :nowait => true | false (default true)
# If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
- def queue name, opts = {}
- queues[name] ||= Queue.new(self, name, opts)
+ def queue name, opts = {}, &block
+ self.queues << Queue.new(self, name, opts, &block)
end
+ def queue! name, opts = {}, &block
+ self.queues.add! Queue.new(self, name, opts, &block)
+ end
+
# Takes a channel, queue and optional object.
#
# The optional object may be a class name, module name or object
# instance. When given a class or module name, the object is instantiated
# during this setup. The passed queue is automatically subscribed to so
# it passes all messages (and their arguments) to the object.
#
# Marshalling and unmarshalling the objects is handled internally. This
# marshalling is subject to the same restrictions as defined in the
- # Marshal[http://ruby-doc.org/core/classes/Marshal.html] standard
+ # Marshal[http://ruby-doc.org/core/classes/Marshal.html] standard
# library. See that documentation for further reference.
#
- # When the optional object is not passed, the returned rpc reference is
- # used to send messages and arguments to the queue. See #method_missing
- # which does all of the heavy lifting with the proxy. Some client
- # elsewhere must call this method *with* the optional block so that
- # there is a valid destination. Failure to do so will just enqueue
+ # When the optional object is not passed, the returned rpc reference is
+ # used to send messages and arguments to the queue. See #method_missing
+ # which does all of the heavy lifting with the proxy. Some client
+ # elsewhere must call this method *with* the optional block so that
+ # there is a valid destination. Failure to do so will just enqueue
# marshalled messages that are never consumed.
#
# EM.run do
# server = MQ.rpc('hash table node', Hash)
#
@@ -716,11 +746,12 @@
#
def rpc name, obj = nil
rpcs[name] ||= RPC.new(self, name, obj)
end
- def close
+ def close(&block)
+ @on_close = block
if @deferred_status == :succeeded
send Protocol::Channel::Close.new(:reply_code => 200,
:reply_text => 'bye',
:method_id => 0,
:class_id => 0)
@@ -760,18 +791,18 @@
# Returns a hash of all the exchange proxy objects.
#
# Not typically called by client code.
def exchanges
- @exchanges ||= {}
+ @exchanges ||= MQ::Collection.new
end
# Returns a hash of all the queue proxy objects.
#
# Not typically called by client code.
def queues
- @queues ||= {}
+ @queues ||= MQ::Collection.new
end
def get_queue
if block_given?
(@get_queue_mutex ||= Mutex.new).synchronize{
@@ -842,6 +873,6 @@
class MQ
# unique identifier
def MQ.id
Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
end
-end
\ No newline at end of file
+end