lib/mq.rb in tmm1-amqp-0.6.0 vs lib/mq.rb in tmm1-amqp-0.6.1

- old
+ new

@@ -213,12 +213,15 @@ @consumer = consumers[ method.consumer_tag ] MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer end when Protocol::Basic::GetEmpty - @consumer = get_queue{|q| q.shift } - @consumer.receive nil, nil + if @consumer = get_queue{|q| q.shift } + @consumer.receive nil, nil + else + MQ.error "Basic.GetEmpty for invalid consumer" + end when Protocol::Channel::Close raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}" when Protocol::Channel::CloseOk @@ -515,10 +518,90 @@ # * :passive => true and the exchange does not exist (NOT_FOUND) # def topic name = 'amq.topic', opts = {} exchanges[name] ||= Exchange.new(self, :topic, name, opts) end - + + # Defines, intializes and returns an Exchange to act as an ingress + # point for all published messages. + # + # == Headers + # A headers exchange allows for messages to be published to an exchange + # + # Any published message, regardless of its persistence setting, is thrown + # away by the exchange when there are no queues bound to it. + # + # As part of the AMQP standard, each server _should_ predeclare a headers + # exchange called 'amq.match' (this is not required by the standard). + # Allocating this exchange without a name _or_ with the empty string + # will use the internal 'amq.match' exchange. + # + # TODO: The classic example is ... + # + # When publishing data to the exchange, bound queues subscribing to the + # exchange indicate which data interests them by passing arguments + # for matching against the headers in published messages. The + # form of the matching can be controlled by the 'x-match' argument, which + # may be 'any' or 'all'. If unspecified (in RabbitMQ at least), it defaults + # to "all". + # + # A value of 'all' for 'x-match' implies that all values must match (i.e. + # it does an AND of the headers ), while a value of 'any' implies that + # at least one should match (ie. it does an OR). + # + # TODO: document behavior when either the binding or the message is missing + # a header present in the other + # + # TODO: insert example + # + # == Options + # * :passive => true | false (default false) + # If set, the server will not create the exchange if it does not + # already exist. The client can use this to check whether an exchange + # exists without modifying the server state. + # + # * :durable => true | false (default false) + # If set when creating a new exchange, the exchange will be marked as + # durable. Durable exchanges remain active when a server restarts. + # Non-durable exchanges (transient exchanges) are purged if/when a + # server restarts. + # + # A transient exchange (the default) is stored in memory-only. The + # exchange and all bindings will be lost on a server restart. + # It makes no sense to publish a persistent message to a transient + # exchange. + # + # Durable exchanges and their bindings are recreated upon a server + # restart. Any published messages not routed to a bound queue are lost. + # + # * :auto_delete => true | false (default false) + # If set, the exchange is deleted when all queues have finished + # using it. The server waits for a short period of time before + # determining the exchange is unused to give time to the client code + # to bind a queue to it. + # + # If the exchange has been previously declared, this option is ignored + # on subsequent declarations. + # + # * :internal => true | false (default false) + # If set, the exchange may not be used directly by publishers, but + # only when bound to other exchanges. Internal exchanges are used to + # construct wiring that is not visible to applications. + # + # * :nowait => true | false (default true) + # If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. + # + # == Exceptions + # Doing any of these activities are illegal and will raise MQ:Error. + # * redeclare an already-declared exchange to a different type + # * :passive => true and the exchange does not exist (NOT_FOUND) + # * using a value other than "any" or "all" for "x-match" + def headers name = 'amq.match', opts = {} + exchanges[name] ||= Exchange.new(self, :headers, name, opts) + end + # Queues store and forward messages. Queues can be configured in the server # or created at runtime. Queues must be attached to at least one exchange # in order to receive messages from publishers. # # Like an Exchange, queue names starting with 'amq.' are reserved for \ No newline at end of file