lib/mq.rb in amqp-0.6.0 vs lib/mq.rb in amqp-0.6.4

- old
+ new

@@ -143,11 +143,11 @@ conn.callback{ |c| @channel = c.add_channel(self) send Protocol::Channel::Open.new } end - attr_reader :channel + attr_reader :channel, :connection # May raise a MQ::Error exception when the frame payload contains a # Protocol::Channel::Close object. # # This usually occurs when a client attempts to perform an illegal @@ -213,12 +213,15 @@ @consumer = consumers[ method.consumer_tag ] MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer end when Protocol::Basic::GetEmpty - @consumer = get_queue{|q| q.shift } - @consumer.receive nil, nil + if @consumer = get_queue{|q| q.shift } + @consumer.receive nil, nil + else + MQ.error "Basic.GetEmpty for invalid consumer" + end when Protocol::Channel::Close raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}" when Protocol::Channel::CloseOk @@ -515,11 +518,91 @@ # * :passive => true and the exchange does not exist (NOT_FOUND) # def topic name = 'amq.topic', opts = {} exchanges[name] ||= Exchange.new(self, :topic, name, opts) end - + + # Defines, intializes and returns an Exchange to act as an ingress + # point for all published messages. + # + # == Headers + # A headers exchange allows for messages to be published to an exchange + # + # Any published message, regardless of its persistence setting, is thrown + # away by the exchange when there are no queues bound to it. + # + # As part of the AMQP standard, each server _should_ predeclare a headers + # exchange called 'amq.match' (this is not required by the standard). + # Allocating this exchange without a name _or_ with the empty string + # will use the internal 'amq.match' exchange. + # + # TODO: The classic example is ... + # + # When publishing data to the exchange, bound queues subscribing to the + # exchange indicate which data interests them by passing arguments + # for matching against the headers in published messages. The + # form of the matching can be controlled by the 'x-match' argument, which + # may be 'any' or 'all'. If unspecified (in RabbitMQ at least), it defaults + # to "all". + # + # A value of 'all' for 'x-match' implies that all values must match (i.e. + # it does an AND of the headers ), while a value of 'any' implies that + # at least one should match (ie. it does an OR). + # + # TODO: document behavior when either the binding or the message is missing + # a header present in the other + # + # TODO: insert example + # + # == Options + # * :passive => true | false (default false) + # If set, the server will not create the exchange if it does not + # already exist. The client can use this to check whether an exchange + # exists without modifying the server state. + # + # * :durable => true | false (default false) + # If set when creating a new exchange, the exchange will be marked as + # durable. Durable exchanges remain active when a server restarts. + # Non-durable exchanges (transient exchanges) are purged if/when a + # server restarts. + # + # A transient exchange (the default) is stored in memory-only. The + # exchange and all bindings will be lost on a server restart. + # It makes no sense to publish a persistent message to a transient + # exchange. + # + # Durable exchanges and their bindings are recreated upon a server + # restart. Any published messages not routed to a bound queue are lost. + # + # * :auto_delete => true | false (default false) + # If set, the exchange is deleted when all queues have finished + # using it. The server waits for a short period of time before + # determining the exchange is unused to give time to the client code + # to bind a queue to it. + # + # If the exchange has been previously declared, this option is ignored + # on subsequent declarations. + # + # * :internal => true | false (default false) + # If set, the exchange may not be used directly by publishers, but + # only when bound to other exchanges. Internal exchanges are used to + # construct wiring that is not visible to applications. + # + # * :nowait => true | false (default true) + # If set, the server will not respond to the method. The client should + # not wait for a reply method. If the server could not complete the + # method it will raise a channel or connection exception. + # + # == Exceptions + # Doing any of these activities are illegal and will raise MQ:Error. + # * redeclare an already-declared exchange to a different type + # * :passive => true and the exchange does not exist (NOT_FOUND) + # * using a value other than "any" or "all" for "x-match" + def headers name = 'amq.match', opts = {} + exchanges[name] ||= Exchange.new(self, :headers, name, opts) + end + # Queues store and forward messages. Queues can be configured in the server # or created at runtime. Queues must be attached to at least one exchange # in order to receive messages from publishers. # # Like an Exchange, queue names starting with 'amq.' are reserved for @@ -645,9 +728,13 @@ if blk @error_callback = blk else @error_callback.call(msg) if @error_callback and msg end + end + + def prefetch(size) + send Protocol::Basic::Qos.new(:prefetch_size => 0, :prefetch_count => size, :global => false) end # Returns a hash of all the exchange proxy objects. # # Not typically called by client code. \ No newline at end of file