lib/amqp/queue.rb in amqp-0.8.0.rc2 vs lib/amqp/queue.rb in amqp-0.8.0.rc3

- old
+ new

@@ -8,31 +8,87 @@ # Queues store and forward messages to consumers. They similar to mailboxes in SMTP. # Messages flow from producing applications to {Exchange exchanges} that route them # to queues and finally queues deliver them to consumer applications (or consumer # applications fetch messages as needed). # + # Note that unlike some other messaging protocols/systems, messages are not delivered directly + # to queues. They are delivered to exchanges that route messages to queues using rules + # knows as *bindings*. # + # # h2. Concept of bindings # + # Binding is an association between a queue and an exchange. # Queues must be bound to at least one exchange in order to receive messages from publishers. # Learn more about bindings in {Exchange Exchange class documentation}. # # + # h2. Key methods + # + # Key methods of Queue class are + # + # * {Queue#bind} + # * {Queue#subscribe} + # * {Queue#pop} + # * {Queue#delete} + # * {Queue#purge} + # * {Queue#unbind} + # + # # h2. Queue names. Server-named queues. Predefined queues. # - # Like an Exchange, queue names starting with 'amq.' are reserved for - # internal use. Attempts to create queue names in violation of this - # reservation will raise AMQP::Error (ACCESS_REFUSED). + # Every queue has a name that identifies it. Queue names often contain several segments separated by a dot (.), similarly to how URI + # path segments are separated by a slash (/), although it may be almost any string, with some limitations (see below). + # Applications may pick queue names or ask broker to generate a name for them. To do so, pass *empty string* as queue name argument. # - # When a queue is created without a name, the server will generate a - # unique name internally (not currently supported in this library). + # Here is an example: # + # @example Declaring a server-named queue using AMQP::Queue constructor + # AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok| + # AMQP::Channel.new do |channel, open_ok| + # AMQP::Queue.new(channel, "", :auto_delete => true) do |queue, declare_ok| + # puts "#{queue.name} is ready to go. AMQP method: #{declare_ok.inspect}" + # + # connection.close { + # EM.stop { exit } + # } + # end + # end + # end + # + # <script src="https://gist.github.com/939596.js?file=gistfile1.rb"></script> + # + # If you want to declare a queue with a particular name, for example, "images.resize", pass it to Queue class constructor: + # + # @example Declaring a server-named queue using AMQP::Queue constructor + # AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok| + # AMQP::Channel.new do |channel, open_ok| + # AMQP::Queue.new(channel, "images.resize", :auto_delete => true) do |queue, declare_ok| + # puts "#{queue.name} is ready to go." + # + # connection.close { + # EM.stop { exit } + # } + # end + # end + # end + # + # <script src="https://gist.github.com/939600.js?file=gistfile1.rb"></script> + # + # Queue names starting with 'amq.' are reserved for internal use by the broker. Attempts to declare queue with a name that violates this + # rule will result in AMQP::IncompatibleOptionsError to be thrown (when + # queue is re-declared on the same channel object) or channel-level exception (when originally queue + # was declared on one channel and re-declaration with different attributes happens on another channel). + # Learn more in {file:docs/Queues.textile Queues guide} and {file:docs/ErrorHandling.textile Error Handling guide}. + # + # + # # h2. Queue life-cycles. When use of server-named queues is optimal and when it isn't. # # To quote AMQP 0.9.1 spec, there are two common message queue life-cycles: # - # * Durable message queues that are shared by many consumers and have an independent existence: i.e. they + # * Durable message queues that are shared by many consumers and have an independent existence: i.e. they # will continue to exist and collect messages whether or not there are consumers to receive them. # * Temporary message queues that are private to one consumer and are tied to that consumer. When the # consumer disconnects, the message queue is deleted. # # There are some variations on these, such as shared message queues that are deleted when the last of @@ -76,30 +132,25 @@ # are not lost. Simply publishing message to a durable exchange or the fact that queue(s) they are routed to # is durable doesn't make messages persistent: it all depends on persistence mode of the messages itself. # Publishing messages as persistent affects performance (just like with data stores, durability comes at a certain cost # in performance and vise versa). Pass :persistent => true to {Exchange#publish} to publish your message as persistent. # - # Note that *only durable queues can be bound to durable exchanges*. + # Note that *only durable queues can be bound to durable exchanges*. Learn more in our {file:docs/Durability.textile Durability guide}. # # # h2. Message ordering # # RabbitMQ FAQ explains {http://www.rabbitmq.com/faq.html#message-ordering ordering of messages in AMQP queues} # # - # h2. Key methods + # h2. Error handling # - # Key methods of Queue class are + # When channel-level error occurs, queues associated with that channel are reset: internal state and callbacks + # are cleared. Recommended strategy is to open a new channel and re-declare all the entities you need. + # Learn more in {file:docs/ErrorHandling.textile Error Handling guide}. # - # * {Queue#bind} - # * {Queue#subscribe} - # * {Queue#pop} - # * {Queue#delete} - # * {Queue#purge} - # * {Queue#unbind} # - # # @note Please make sure you read a section on queue durability vs. messages # persistence. # # # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.1) @@ -110,15 +161,12 @@ # API # # Name of this queue attr_reader :name - attr_reader :sync_bind # Options this queue object was instantiated with attr_accessor :opts - attr_accessor :on_declare - attr_accessor :on_bind # @option opts [Boolean] :passive (false) If set, the server will not create the queue if it does not # already exist. The client can use this to check whether the queue @@ -161,23 +209,31 @@ # @api public def initialize(channel, name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) @channel = channel name = AMQ::Protocol::EMPTY_STRING if name.nil? @name = name unless name.empty? - @opts = self.class.add_default_options(name, opts, block) - @bindings = Hash.new + @server_named = name.empty? + @opts = self.class.add_default_options(name, opts, block) + @bindings = Hash.new + # a deferrable that we use to delay operations until this queue is actually declared. + # one reason for this is to support a case when a server-named queue is immediately bound. + # it's crazy, but 0.7.x supports it, so... MK. + @declaration_deferrable = AMQ::Client::EventMachineClient::Deferrable.new + if @opts[:nowait] @status = :opened block.call(self) if block else @status = :opening end super(channel.connection, channel, name) shim = Proc.new do |q, declare_ok| + @declaration_deferrable.succeed + case block.arity when 1 then block.call(q) else block.call(q, declare_ok) end @@ -185,16 +241,24 @@ @channel.once_open do if block self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], @opts[:nowait], @opts[:arguments], &shim) else - self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], @opts[:nowait], @opts[:arguments]) + injected_callback = Proc.new { @declaration_deferrable.succeed } + # we cannot pass :nowait as true here, AMQ::Client::Queue will (rightfully) raise an exception because + # it has no idea about crazy edge cases we are trying to support for sake of backwards compatibility. MK. + self.declare(@opts[:passive], @opts[:durable], @opts[:exclusive], @opts[:auto_delete], false, @opts[:arguments], &injected_callback) end end end + # @return [Boolean] true if this queue is server-named + def server_named? + @server_named + end # server_named? + # This method binds a queue to an exchange. Until a queue is # bound it will not receive any messages. In a classic messaging # model, store-and-forward queues are bound to a dest exchange # and subscription queues are bound to a dest_wild exchange. # @@ -232,18 +296,25 @@ # # @api public # @see Queue#unbind def bind(exchange, opts = {}, &block) @status = :unbound - @sync_bind = !opts[:nowait] # amq-client's Queue already does exchange.respond_to?(:name) ? exchange.name : exchange # for us exchange = exchange @bindings[exchange] = opts - @channel.once_open do - super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) + if self.server_named? + @channel.once_open do + @declaration_deferrable.callback do + super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) + end + end + else + @channel.once_open do + super(exchange, (opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING), (opts[:nowait] || block.nil?), opts[:arguments], &block) + end end self end @@ -595,17 +666,9 @@ # # @api public # @deprecated def callback @on_declare - end - - # Compatibility alias for #on_bind. - # - # @api public - # @deprecated - def bind_callback - @on_bind end