# @title Ruby AMQP gem: Working with queues

h1. Working with queues


h2. About this guide

This guide covers everything related to queues in AMQP 0.9.1, common usage scenarios and how to accomplish typical operations using
amqp gem.


h2. Covered versions

This guide covers "Ruby amqp gem":http://github.com/ruby-amqp/amqp v0.8.0 and later.



h2. Queues in AMQP 0.9.1, briefly

h3. What are AMQP queues?

Queues store and forward messages to consumers. They are similar to mailboxes in SMTP.
Messages flow from producing applications to {file:docs/Exchanges.textile exchanges} that route them
to queues and finally queues deliver them to consumer applications (or consumer applications fetch messages as needed).

Note that unlike some other messaging protocols/systems, messages are not delivered directly
to queues. They are delivered to exchanges that route messages to queues using rules
knows as *bindings*.

AMQP is a programmable protocol, so queues and bindings alike are declared by applications.


h3. Concept of bindings

Binding is an association between a queue and an exchange. Queues must be bound to at least one exchange in order to receive messages from publishers.
Learn more about bindings in {file:docs/Bindings.textile Bindings guide}.


h3. Attributes

Queues have several attributes associated with them:

 * Name
 * Exclusivity
 * Durability
 * Whether queue is auto-deleted when no longer used
 * Other metadata (aka X-arguments)

These attributes define how queues can be used, what their lifecycle is like and other aspects of queue
behavior.

amqp gem represents queues as instances of {AMQP::Queue}.

h2. Queue names. Server-named queues. Predefined queues.

Every queue has a name that identifies it. Queue names often contain several segments separated by a dot (.), similarly to how URI
path segments are separated by a slash (/), although it may be almost any string, with some limitations (see below).
Applications may pick queue names or ask broker to generate a name for them. To do so, pass *empty string* as queue name argument.

Here is an example:

<script src="https://gist.github.com/998720.js"> </script>

If you want to declare a queue with a particular name, for example, "images.resize", pass it to Queue class constructor:

<script src="https://gist.github.com/998721.js"> </script>

Queue names starting with 'amq.' are reserved for internal use by the broker. Attempts to declare queue with a name that violates this
rule will result in AMQP::IncompatibleOptionsError to be thrown (when queue is re-declared on the same channel object) or channel-level exception
(when originally queue was declared on one channel and re-declaration with different attributes happens on another channel).
Learn more in Error handling and recovery section below.



h2. Queue life-cycle patterns.

To quote AMQP 0.9.1 spec, there are two common message queue life-cycle patterns:

 * Durable message queues that are shared by many consumers and have an independent existence: i.e. they
   will continue to exist and collect messages whether or not there are consumers to receive them.
 * Temporary message queues that are private to one consumer and are tied to that consumer. When the
   consumer disconnects, the message queue is deleted.

There are some variations of these, such as shared message queues that are deleted when the last of
many consumers disconnects.

One example of durable message queues is well-known services like event collectors (event loggers).
They are usually up whether there are services to log anything or not. Other applications know what
queues they use and can rely on those queues being around all the time, survive broker restarts and
in general be available should an application in the network need to use them. In this case,
explicitly named durable queues are optimal and coupling it creates between applications is not
an issue. Another scenario of a well-known long-lived service is distributed metadata/directory/locking server
like Apache Zookeeper, Google's Chubby or DNS. Services like this benefit from using well-known, not generated
queue names, and so do other applications that use them.

Different scenario is in "a cloud settings" when some kind of workers/instances may come online and
go down basically any time and other applications cannot rely on them being available. Using well-known
queue names in this case is possible but server-generated, short-lived queues that are bound to
topic or fanout exchanges to receive relevant messages is a better idea.

Imagine a service that processes an endless stream of events (Twitter is one example). When traffic goes
up, development operations may spin up additional applications instances in the cloud to handle the load.
Those new instances want to subscribe to receive messages to process but the rest of the system doesn't
know anything about them, rely on them being online or try to address them directly: they process events
from a shared stream and are not different from their peers. In a case like this, there is no reason for
message consumers to not use queue names generated by the broker.

In general, use of explicitly named or server-named queues depends on messaging pattern your application needs.
{http://www.eaipatterns.com/ Enterprise Integration Patters} discusses many messaging patterns in depth.
RabbitMQ FAQ also has a section on {http://www.rabbitmq.com/faq.html#scenarios use cases}.



h2. Declaring a durable shared queue

To declare a durable shared queue, you pass queue name that is a non-blank string and use :durable option:

<script src="https://gist.github.com/998723.js"> </script>

the same piece of code that uses {AMQP::Channel#queue} for convenience:

<script src="https://gist.github.com/998724.js"> </script>



h2. Declaring a temporary exclusive queue

To declare a server-named, exclusive, auto-deleted queue, pass "" (empty string) as queue name and
use :exclusive and :auto_delete options:

<script src="https://gist.github.com/998725.js"> </script>

the same piece of code that uses {AMQP::Channel#queue} for convenience:

<script src="https://gist.github.com/998726.js"> </script>



h2. Binding queues to exchanges

In order to receive messages, a queue needs to be bound to at least one exchange. Most of the time binding is explcit (done by applications).
To bind a queue to an exchange, use {AMQP::Queue#bind}. Argument can be either an {AMQP::Exchange} instance, as demonstrated in this example

<script src="https://gist.github.com/998727.js"> </script>

or an exchange name given as a string:

<script src="https://gist.github.com/998729.js"> </script>




h2. Subscribing to receive messages ("push API")

To subscribe to receive messages when they arrive to the queue ("start a queue consumer"), one uses {AMQP::Queue#subscribe} method.
Then when a message arrives, message header and body (aka payload) are passed to the handler:

<script src="https://gist.github.com/998731.js"> </script>

Subscriptions for message delivery are usually referred to as "consumers" in the AMQP 0.9.1 spec, client libraries documentation and books.
Consumers last as long as the channel they were declared on, or until the client cancels them (unsubscribes).

Consumers are identified by <i>consumer tags</i>. If you need to obtain consumer tag of a queue that is subscribed to receive messages,
use {AMQP::Queue#consumer_tag}.


h3. Accessing message metadata

`header` object in the example above provides access to message metadata and delivery information:

 * Message content type
 * Message content­encoding
 * Message routing key
 * Message delivery mode (persistent or not)
 * Consumer tag this delivery is for
 * Delivery tag
 * Message priority
 * Whether or not message is redelivered
 * Producer application id

and so on. An example to demonstrate how to access some of those attributes:

<script src="https://gist.github.com/998739.js"> </script>


h3. Exclusive consumers

Consumers can request exclusive access to the queue (meaning only this consumer can access the queue). This is useful when you want a long-lived shared
queue to be temporarily accessible by just one application (or thread, or process). If application exclusive consumer is part of crashes or loses
TCP connection to the broker, channel is closed and exclusive consumer is thus cancelled.

To exclusively receive messages from the queue, pass :exclusive option to {AMQP::Queue#subscribe}:

<pre>
<code>
queue.subscribe(:exclusive => true) do |metadata, payload|
  # message handling logic...
end
</code>
</pre>

TBD: describe what happens when exclusivity property is violated and how to handle it.


h3. Message acknowledgements

Consumer applications (applications that receive and process messages) may (and will) occasionally fail to process individual messages, or will just
crash. That's not to mention possible network issues. This raises a question: when should AMQP broker remove messages from queues?  AMQP 0.9.1 lets
you choose one of two answers:

 * After broker sends a message to an application (using either basic.deliver or basic.get-ok methods).
 * After the application sends back an acknowledgement (using basic.ack AMQP method).

The former model is called *automatic acknowledgement model* while the latter is *explicit acknowledgement model*. With the explicit model, application
chooses when it's time to send an ack: it can be right after receiving it, or after persisting it to a data store before processing, or after fully
processing the message (for example, successfully fetching a Web page, processing and storing it into some persistent data store).

If a consumer dies without sending an ack, AMQP broker will redeliver it to another consumer (or, if none are available at the time, it will wait
until at least one consumer is registered for the same queue).

Acknowledgement model is chosen when a new consumer is registered for a queue. By default, {AMQP::Queue#subscribe} will use the *automatic* model.
To switch to the *explicit* model, :ack option should be used:

<pre>
<code>
queue.subscribe(:ack => true) do |metadata, payload|
  # message handling logic...
end
</code>
</pre>

To demonstrate how redelivery works, lets have a look at the following code example:

<script src="https://gist.github.com/999396.js"> </script>

So what is going on here? This example uses 3 AMQP connections to imitate 3 applications, 1 producer and two consumers. Each AMQP connection opens a single
channel:

<pre>
<code>
# open two connections to imitate two apps
connection1 = AMQP.connect
connection2 = AMQP.connect
connection3 = AMQP.connect

channel_exception_handler = Proc.new { |ch, channel_close| EventMachine.stop; raise "channel error: #{channel_close.reply_text}" }

# open two channels
channel1    = AMQP::Channel.new(connection1)
channel1.on_error(&channel_exception_handler)
# ...

channel2    = AMQP::Channel.new(connection2)
channel2.on_error(&channel_exception_handler)
# ...

# app 3 will just publish messages
channel3    = AMQP::Channel.new(connection3)
channel3.on_error(&channel_exception_handler)
</code>
</pre>

Consumers share a queue and producer publishes messages there periodically using `amq.direct` exchange. Both "applications" subscribe to receive messages
using explicit acknowledgement model. AMQP broker by default will send each message to the next consumer, in sequence
(this kind of load balancing is knownas *round-robin*). So some messages will be delivered to consumer #1 and some to consumer #2.

<pre>
<code>
exchange = channel3.direct("amq.direct")

# ...

queue1    = channel1.queue("amqpgem.examples.acknowledgements.explicit", :auto_delete => false)
# purge the queue so that we don't get any redeliveries from previous runs
queue1.purge
queue1.bind(exchange).subscribe(:ack => true) do |metadata, payload|
  # do some work
  sleep(0.2)

  # acknowledge some messages, they will be removed from the queue
  if rand > 0.5
    # FYI: there is a shortcut, metadata.ack
    channel1.acknowledge(metadata.delivery_tag, false)
    puts "[consumer1] Got message ##{metadata.headers['i']}, ack-ed"
  else
    # odd messages are not ack-ed and will remain in the queue for redelivery
    # when app #1 connection is closed (either properly or due to a crash)
    puts "[consumer1] Got message ##{metadata.headers['i']}, SKIPPPED"
  end
end

queue2    = channel2.queue!("amqpgem.examples.acknowledgements.explicit", :auto_delete => false)
queue2.subscribe(:ack => true) do |metadata, payload|
  metadata.ack
  # app 2 always acks messages
  puts "[consumer2] Received #{payload}, redelivered = #{metadata.redelivered}, ack-ed"
end
</code>
</pre>

To demonstrate message redelivery we make consumer #1 randomly select what messages to acknowledge. After 4 seconds we disconnect it (to imitate
a crash). When that happens, AMQP broker redelivers unacknowledged messages to the consumer #2 which acknowledges them unconditionally. After 10 seconds,
this example closes all outstanding connections and exits.

An example output produced by this example:

<pre>
=> Subscribing for messages using explicit acknowledgements model

[consumer2] Received Message #0, redelivered = false, ack-ed
[consumer1] Got message #1, SKIPPPED
[consumer1] Got message #2, SKIPPPED
[consumer1] Got message #3, ack-ed
[consumer2] Received Message #4, redelivered = false, ack-ed
[consumer1] Got message #5, SKIPPPED
[consumer2] Received Message #6, redelivered = false, ack-ed
[consumer2] Received Message #7, redelivered = false, ack-ed
[consumer2] Received Message #8, redelivered = false, ack-ed
[consumer2] Received Message #9, redelivered = false, ack-ed
[consumer2] Received Message #10, redelivered = false, ack-ed
[consumer2] Received Message #11, redelivered = false, ack-ed
----- Connection 1 is now closed (we pretend that it has crashed) -----
[consumer2] Received Message #5, redelivered = true, ack-ed
[consumer2] Received Message #1, redelivered = true, ack-ed
[consumer2] Received Message #2, redelivered = true, ack-ed
[consumer2] Received Message #12, redelivered = false, ack-ed
[consumer2] Received Message #13, redelivered = false, ack-ed
[consumer2] Received Message #14, redelivered = false, ack-ed
[consumer2] Received Message #15, redelivered = false, ack-ed
[consumer2] Received Message #16, redelivered = false, ack-ed
[consumer2] Received Message #17, redelivered = false, ack-ed
[consumer2] Received Message #18, redelivered = false, ack-ed
[consumer2] Received Message #19, redelivered = false, ack-ed
[consumer2] Received Message #20, redelivered = false, ack-ed
[consumer2] Received Message #21, redelivered = false, ack-ed
[consumer2] Received Message #22, redelivered = false, ack-ed
[consumer2] Received Message #23, redelivered = false, ack-ed
[consumer2] Received Message #24, redelivered = false, ack-ed
[consumer2] Received Message #25, redelivered = false, ack-ed
[consumer2] Received Message #26, redelivered = false, ack-ed
[consumer2] Received Message #27, redelivered = false, ack-ed
[consumer2] Received Message #28, redelivered = false, ack-ed
[consumer2] Received Message #29, redelivered = false, ack-ed
[consumer2] Received Message #30, redelivered = false, ack-ed
[consumer2] Received Message #31, redelivered = false, ack-ed
[consumer2] Received Message #32, redelivered = false, ack-ed
[consumer2] Received Message #33, redelivered = false, ack-ed
[consumer2] Received Message #34, redelivered = false, ack-ed
[consumer2] Received Message #35, redelivered = false, ack-ed
</pre>

As we can see, consumer #1 did not acknowledge 3 messages (labelled 1, 2 and 5):

<pre>
[consumer1] Got message #1, SKIPPPED
[consumer1] Got message #2, SKIPPPED
...
[consumer1] Got message #5, SKIPPPED
</pre>

and then, once consumer #1 had "crashed", those messages were immediately redelivered to the consumer #2:

<pre>
Connection 1 is now closed (we pretend that it has crashed)
[consumer2] Received Message #5, redelivered = true, ack-ed
[consumer2] Received Message #1, redelivered = true, ack-ed
[consumer2] Received Message #2, redelivered = true, ack-ed
</pre>

To acknowledge a message, use {AMQP::Channel#acknowledge}:

<pre>
<code>
channel1.acknowledge(metadata.delivery_tag, false)
</code>
</pre>

{AMQP::Channel#acknowledge} takes two arguments: message *delivery tag* and a flag that indicates whether or not we acknowledge
multiple messages at once. Delivery tag is simply a channel-specific increasing number that server uses to identify deliveries.

When acknowledging multiple messages at once, the delivery tag is treated as "up to and including". For example, for delivery
tag = 5 that would mean "acknowledge messages 1, 2, 3, 4 and 5".

As a shortcut, it is possible to acknowledge messages using {AMQP::Header#ack} method:

<pre>
<code>
queue2.subscribe(:ack => true) do |metadata, payload|
  metadata.ack
end
</code>
</pre>

<span class="note">
Acknowledgements are channel-specific. Applications must not receive messages on one channel and acknowledge them on another channel.
</span>

<span class="note">
A message MUST not be acknowledged more than once. Doing so will result in a channel-level exception (PRECONDITION_FAILED)
with an error message like this: «PRECONDITION_FAILED - unknown delivery tag»
</span>



h3. Rejecting messages.

When a consumer application receives a message, processing of that message may or may not succeed. Application can
indicate it to the broker that message processing has failed (or cannot be accomplished at the time) by rejecting a message.
While rejecting a message, application can ask broker to discard or requeue it.

To reject a message, use {AMQP::Channel#reject} method:

<pre>
<code>
queue.bind(exchange).subscribe do |metadata, payload|
  # reject but don't requeue (simply discard)
  channel.reject(metadata.delivery_tag)
end
</code>
</pre>

in the example above messages are rejected without requeueing (broker will simply discard them). To requeue rejected message,
use the second argument {AMQP::Channel#reject} takes:

<pre>
<code>
queue.bind(exchange).subscribe do |metadata, payload|
  # reject & requeue
  channel.reject(metadata.delivery_tag, true)
end
</code>
</pre>

<span class="note">
When there is only one consumer on a queue, make sure you don't create infinite message delivery loops by rejecting & requeueing
message from that consumer over and over.
</span>

Another way to reject a message is by using {AMQP::Header#reject}:

<pre>
<code>
queue.bind(exchange).subscribe do |metadata, payload|
  # reject but don't requeue (simply discard)
  metadata.reject
end
</code>
</pre>

<pre>
<code>
queue.bind(exchange).subscribe do |metadata, payload|
  # reject & requeue
  metadata.reject(true)
end
</code>
</pre>


h3. Negative acknowledgements.

Messages are rejected with `basic.reject` AMQP method. There is one limitation `basic.reject` has: there is no way to reject multiple
messages, like you can do with acknowledgements. If you are using "RabbitMQ":http://rabbitmq.com, there is a solution: RabbitMQ provides
an AMQP 0.9.1 extension known as "negative acknowledgements":http://www.rabbitmq.com/extensions.html#negative-acknowledgements (nacks) and
amqp gem supports it. For more information, please refer to the {file:docs/VendorSpecificExtensions.textile Vendor-specific Extensions guide}.


h3. QoS. Prefetching messages.

For cases when multiple consumers share a queue, it is useful to be able to specify how many messages each consumer can be sent at once
(before sending the next acknowledgement). This can be used as a simple load balancing technique or to improve throughput if messages tend
to be published in batches (for example, if producing application sends them every minute because of the nature of the work it is doing).

Imagine a Website that takes data from social media sources like Twitter or Facebook during the Champions League final (or the Superbowl), and then
calculates how many tweets mentioned a particular team over the last minute. It can be structured as 3 applications:

 * A crawler that uses streaming APIs to fetch tweets/statuses, normalizes them and sends them in JSON for processing by other applications ("app A").
 * A calculator that detects what team is mentioned in a message, updates statistics and pushes an update to the Web UI once a minute ("app B").
 * A Web UI that fans visit to see the stats ("app C").

In this imaginary example, tweets per second rate will vary but to improve throughput of the system and decrease maximum number of messages AMQP broker
has to hold in memory at once, applications can be designed in such a way that application "app B", the "calculator", receives 5000 messages and then
acknowledges them all at once. Broker will not send message 5001 to it unless it receives an acknowledgement for #1, either individually or in bulk
with other messages.

In AMQP parlance this is know as *QoS* or *message prefetching*. Prefetching is configured on per-channel (typically) or per-connection (rarely used)
basis. To configure prefetching per channel, use {AMQP::Channel#prefetch} method. Lets come back to the example we used in the "Message acknowledgements"
section:

<pre>
<code>
# first app will be given up to 3 messages at a time. If it doesn't
# ack any messages after it was delivered 3, messages will be routed to
# the app #2.
channel1.prefetch(3)

# app #2 processes messages one-by-one and has to send and ack every time
channel2.prefetch(1)
</code>
</pre>

In that example, one consumer prefetches 3 messages and another consumer prefetches just 1. If we take a look at the output that example produces,
we will see that `consumer1` fetched 4 messages and acknowledged 1. After that, all subsequent messages were delivered to the `consumer2`:

<pre>
[consumer2] Received Message #0, redelivered = false, ack-ed
[consumer1] Got message #1, SKIPPPED
[consumer1] Got message #2, SKIPPPED
[consumer1] Got message #3, ack-ed
[consumer2] Received Message #4, redelivered = false, ack-ed
[consumer1] Got message #5, SKIPPPED
--- by now consumer 1 has received 3 messages it did not acknowledge. With prefetch = 3, AMQP broker won't send it any more messages until consumer 1 sends an ack ---
[consumer2] Received Message #6, redelivered = false, ack-ed
[consumer2] Received Message #7, redelivered = false, ack-ed
[consumer2] Received Message #8, redelivered = false, ack-ed
[consumer2] Received Message #9, redelivered = false, ack-ed
[consumer2] Received Message #10, redelivered = false, ack-ed
[consumer2] Received Message #11, redelivered = false, ack-ed
</pre>

<span class="note">
Prefetching setting is ignored for consumers that do not use explicit acknowledgements
</span>



h2. Fetching messages when needed ("pull API")

AMQP 0.9.1 also provides a way for applications to fetch (pull) messages from the queue only when necessary. For that, use
{AMQP::Queue#pop}:

<script src="https://gist.github.com/998732.js"> </script>

TBD


h2. Unsubscribing from messages

Sometimes it is necessary to unsubscribe from messages without deleting a queue. To do that, use {AMQP::Queue#unsubscribe} method:

<script src="https://gist.github.com/998734.js"> </script>

By default {AMQP::Queue#unsubscribe} uses :noack option to inform broker that there is no need to send a
confirmation. In other words, it does not expect you to pass in a callback, because consumer tag on the queue instance and registered
callback for messages are cleared immediately.


h2. Unbinding queues from exchanges

To unbind queue from exchange, use {AMQP::Queue#unbind}:

<script src="https://gist.github.com/998742.js"> </script>

Note that unbinding an exchange queue was never bound to will result in a channel-level exception.


h2. Purging queues

It is possible to purge a queue (remove all messages from it) using {AMQP::Queue#purge}:

<script src="https://gist.github.com/998743.js"> </script>


This method takes a callback but it is optional. However, remember that this operation is performed asynchronously.



h2. Deleting queues

To delete a queue, use {AMQP::Queue#delete}:

<script src="https://gist.github.com/998744.js"> </script>

This method takes a callback but it is optional. However, remember that this operation is performed asynchronously.

When queue is deleted, all the messages in it are deleted as well.



h2. Queue durability vs Message durability

See {file:docs/Durability.textile Durability guide}



h2. Error handling and recovery

See {file:docs/ErrorHandling.textile Error handling and recovery guide}



h2. Vendor-specific extensions related to queues

See {file:docs/VendorSpecificExtensions.textile Vendor-specific Extensions guide}



h2.  What to read next

Documentation is organized as several {file:docs/DocumentationGuidesIndex.textile documentation guides}, covering all kinds of
topics. Guides related to this one are

 * {file:docs/Exchanges.textile Exchanges}
 * {file:docs/Bindings.textile Bindings}
 * {file:docs/ErrorHandling.textile Error handling and recovery}

RabbitMQ implements a number of extensions to AMQP 0.9.1 functionality, covered in the {file:docs/VendorSpecificExtensions.textile Vendor-specific Extensions guide}.
At least one extension, per-queue messages time-to-live (TTL), is related to this guide and can be used with amqp gem 0.8.0 and later.



h2. Tell us what you think!

Please take a moment and tell us what you think about this guide on "Ruby AMQP mailing list":http://groups.google.com/group/ruby-amqp:
what was unclear? what wasn't covered? maybe you don't like guide style or grammar and spelling are incorrect? Readers feedback is
key to making documentation better.

If mailing list communication is not an option for you for some reason, you can "contact guides author directly":mailto:michael@novemberain.com?subject=amqp%20gem%20documentation


<div id="disqus_thread"></div>
<script type="text/javascript">
    /* * * CONFIGURATION VARIABLES * * */
    var disqus_shortname = 'rubyamqpdocs'; // required: replace example with your forum shortname

    var disqus_developer = 0; // set to 1 on local machine for testing comments
    var disqus_identifier = 'amqp_queues';
    var disqus_url = 'http://rdoc.info/github/ruby-amqp/amqp/master/file/docs/Queues.textile';

    /* * * DON'T EDIT BELOW THIS LINE * * */
    (function() {
        var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
        dsq.src = 'http://' + disqus_shortname + '.disqus.com/embed.js';
        (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
    })();
</script>