lib/mq.rb in tmm1-amqp-0.6.0 vs lib/mq.rb in tmm1-amqp-0.6.1
- old
+ new
@@ -213,12 +213,15 @@
@consumer = consumers[ method.consumer_tag ]
MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer
end
when Protocol::Basic::GetEmpty
- @consumer = get_queue{|q| q.shift }
- @consumer.receive nil, nil
+ if @consumer = get_queue{|q| q.shift }
+ @consumer.receive nil, nil
+ else
+ MQ.error "Basic.GetEmpty for invalid consumer"
+ end
when Protocol::Channel::Close
raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}"
when Protocol::Channel::CloseOk
@@ -515,10 +518,90 @@
# * :passive => true and the exchange does not exist (NOT_FOUND)
#
def topic name = 'amq.topic', opts = {}
exchanges[name] ||= Exchange.new(self, :topic, name, opts)
end
-
+
+ # Defines, intializes and returns an Exchange to act as an ingress
+ # point for all published messages.
+ #
+ # == Headers
+ # A headers exchange allows for messages to be published to an exchange
+ #
+ # Any published message, regardless of its persistence setting, is thrown
+ # away by the exchange when there are no queues bound to it.
+ #
+ # As part of the AMQP standard, each server _should_ predeclare a headers
+ # exchange called 'amq.match' (this is not required by the standard).
+ # Allocating this exchange without a name _or_ with the empty string
+ # will use the internal 'amq.match' exchange.
+ #
+ # TODO: The classic example is ...
+ #
+ # When publishing data to the exchange, bound queues subscribing to the
+ # exchange indicate which data interests them by passing arguments
+ # for matching against the headers in published messages. The
+ # form of the matching can be controlled by the 'x-match' argument, which
+ # may be 'any' or 'all'. If unspecified (in RabbitMQ at least), it defaults
+ # to "all".
+ #
+ # A value of 'all' for 'x-match' implies that all values must match (i.e.
+ # it does an AND of the headers ), while a value of 'any' implies that
+ # at least one should match (ie. it does an OR).
+ #
+ # TODO: document behavior when either the binding or the message is missing
+ # a header present in the other
+ #
+ # TODO: insert example
+ #
+ # == Options
+ # * :passive => true | false (default false)
+ # If set, the server will not create the exchange if it does not
+ # already exist. The client can use this to check whether an exchange
+ # exists without modifying the server state.
+ #
+ # * :durable => true | false (default false)
+ # If set when creating a new exchange, the exchange will be marked as
+ # durable. Durable exchanges remain active when a server restarts.
+ # Non-durable exchanges (transient exchanges) are purged if/when a
+ # server restarts.
+ #
+ # A transient exchange (the default) is stored in memory-only. The
+ # exchange and all bindings will be lost on a server restart.
+ # It makes no sense to publish a persistent message to a transient
+ # exchange.
+ #
+ # Durable exchanges and their bindings are recreated upon a server
+ # restart. Any published messages not routed to a bound queue are lost.
+ #
+ # * :auto_delete => true | false (default false)
+ # If set, the exchange is deleted when all queues have finished
+ # using it. The server waits for a short period of time before
+ # determining the exchange is unused to give time to the client code
+ # to bind a queue to it.
+ #
+ # If the exchange has been previously declared, this option is ignored
+ # on subsequent declarations.
+ #
+ # * :internal => true | false (default false)
+ # If set, the exchange may not be used directly by publishers, but
+ # only when bound to other exchanges. Internal exchanges are used to
+ # construct wiring that is not visible to applications.
+ #
+ # * :nowait => true | false (default true)
+ # If set, the server will not respond to the method. The client should
+ # not wait for a reply method. If the server could not complete the
+ # method it will raise a channel or connection exception.
+ #
+ # == Exceptions
+ # Doing any of these activities are illegal and will raise MQ:Error.
+ # * redeclare an already-declared exchange to a different type
+ # * :passive => true and the exchange does not exist (NOT_FOUND)
+ # * using a value other than "any" or "all" for "x-match"
+ def headers name = 'amq.match', opts = {}
+ exchanges[name] ||= Exchange.new(self, :headers, name, opts)
+ end
+
# Queues store and forward messages. Queues can be configured in the server
# or created at runtime. Queues must be attached to at least one exchange
# in order to receive messages from publishers.
#
# Like an Exchange, queue names starting with 'amq.' are reserved for
\ No newline at end of file