h1. AMQP queues in detail h2. About this guide This guide covers everything related to queues in AMQP 0.9.1, common usage scenarios and how to accomplish typical operations using amqp gem. h2. Covered versions This guide covers amqp gem v0.8.0 and later. h2. Queues in AMQP 0.9.1, briefly h3. What are AMQP queues? Queues store and forward messages to consumers. They are similar to mailboxes in SMTP. Messages flow from producing applications to {file:docs/Exchanges.textile exchanges} that route them to queues and finally queues deliver them to consumer applications (or consumer applications fetch messages as needed). Note that unlike some other messaging protocols/systems, messages are not delivered directly to queues. They are delivered to exchanges that route messages to queues using rules knows as *bindings*. AMQP is a programmable protocol, so queues and bindings alike are declared by applications. h3. Concept of bindings Binding is an association between a queue and an exchange. Queues must be bound to at least one exchange in order to receive messages from publishers. Learn more about bindings in {file:docs/Bindings.textile Bindings guide}. h3. Attributes Queues have several attributes associated with them: * Name * Exclusivity * Whether queue is auto-deleted when no longer used * Other metadata (aka X-arguments) These attributes define how queues can be used, what their lifecycle is like and other aspects of queue behavior. amqp gem represents queues as instances of {AMQP::Queue}. h2. Queue names. Server-named queues. Predefined queues. Every queue has a name that identifies it. Queue names often contain several segments separated by a dot (.), similarly to how URI path segments are separated by a slash (/), although it may be almost any string, with some limitations (see below). Applications may pick queue names or ask broker to generate a name for them. To do so, pass *empty string* as queue name argument. Here is an example:
# Declaring a server-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
AMQP::Queue.new(channel, "", :auto_delete => true) do |queue, declare_ok|
puts "#{queue.name} is ready to go. AMQP method: #{declare_ok.inspect}"
connection.close {
EventMachine.stop { exit }
}
end
end
end
If you want to declare a queue with a particular name, for example, "images.resize", pass it to Queue class constructor:
# Declaring a server-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
AMQP::Queue.new(channel, "images.resize", :auto_delete => true) do |queue, declare_ok|
puts "#{queue.name} is ready to go."
connection.close {
EventMachine.stop { exit }
}
end
end
end
Queue names starting with 'amq.' are reserved for internal use by the broker. Attempts to declare queue with a name that violates this
rule will result in AMQP::IncompatibleOptionsError to be thrown (when queue is re-declared on the same channel object) or channel-level exception
(when originally queue was declared on one channel and re-declaration with different attributes happens on another channel).
Learn more in Error handling and recovery section below.
h2. Common usage scenarios
h2. Queue life-cycles. When use of server-named queues is optimal and when it isn't.
To quote AMQP 0.9.1 spec, there are two common message queue life-cycles:
* Durable message queues that are shared by many consumers and have an independent existence: i.e. they
will continue to exist and collect messages whether or not there are consumers to receive them.
* Temporary message queues that are private to one consumer and are tied to that consumer. When the
consumer disconnects, the message queue is deleted.
There are some variations on these, such as shared message queues that are deleted when the last of
many consumers disconnects.
One example of durable message queues is well-known services like event collectors (event loggers).
They are usually up whether there are services to log anything or not. Other applications know what
queues they use and can rely on those queues being around all the time, survive broker restarts and
in general be available should an application in the network need to use them. In this case,
explicitly named durable queues are optimal and coupling it creates between applications is not
an issue. Another scenario of a well-known long-lived service is distributed metadata/directory/locking server
like Apache Zookeeper, Google's Chubby or DNS. Services like this benefit from using well-known, not generated
queue names, and so do other applications that use them.
Different scenario is in "a cloud settings" when some kind of workers/instances may come online and
go down basically any time and other applications cannot rely on them being available. Using well-known
queue names in this case is possible but server-generated, short-lived queues that are bound to
topic or fanout exchanges to receive relevant messages is a better idea.
Imagine a service that processes an endless stream of events (Twitter is one example). When traffic goes
up, development operations may spin up additional applications instances in the cloud to handle the load.
Those new instances want to subscribe to receive messages to process but the rest of the system doesn't
know anything about them, rely on them being online or try to address them directly: they process events
from a shared stream and are not different from their peers. In a case like this, there is no reason for
message consumers to not use queue names generated by the broker.
In general, use of explicitly named or server-named queues depends on messaging pattern your application needs.
{http://www.eaipatterns.com/ Enterprise Integration Patters} discusses many messaging patterns in depth.
RabbitMQ FAQ also has a section on {http://www.rabbitmq.com/faq.html#scenarios use cases}.
h2. Declaring a durable shared queue
To declare a durable shared queue, you pass queue name that is a non-blank string and use :durable option:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
# Declaring a client-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
AMQP::Queue.new(channel, "images.resize", :durable => true) do |queue, declare_ok|
puts "#{queue.name} is ready to go."
connection.close {
EventMachine.stop { exit }
}
end
end
end
the same piece of code that uses {AMQP::Channel#queue} for convenience:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
# Declaring a client-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
channel.queue("images.resize", :durable => true) do |queue, declare_ok|
puts "#{queue.name} is ready to go."
connection.close {
EventMachine.stop { exit }
}
end
end
end
h2. Declaring a temporary exclusive queue
To declare a server-named, exclusive, auto-deleted queue, pass "" (empty string) as queue name and
use :exclusive and :auto_delete options:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
# Declaring a server-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
AMQP::Queue.new(channel, "", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
puts "#{queue.name} is ready to go."
connection.close {
EventMachine.stop { exit }
}
end
end
end
the same piece of code that uses {AMQP::Channel#queue} for convenience:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
# Declaring a server-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
puts "#{queue.name} is ready to go."
connection.close {
EventMachine.stop { exit }
}
end
end
end
h2. Binding queues to exchanges
In order to receive messages, a queue needs to be bound to at least one exchange. Most of the time binding is explcit (done by applications).
To bind a queue to an exchange, use {AMQP::Queue#bind). Argument can be either an {AMQP::Exchange} instance or exchange name:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
# Binding a queue to an exchange
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
exchange = channel.fanout("amq.fanout")
channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
queue.bind(exchange) do |bind_ok|
puts "Just bound #{queue.name} to #{exchange.name}"
end
connection.close {
EventMachine.stop { exit }
}
end
end
end
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
# Binding a queue to an exchange
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
exchange_name = "amq.fanout"
channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
queue.bind(exchange_name) do |bind_ok|
puts "Just bound #{queue.name} to #{exchange_name}"
end
connection.close {
EventMachine.stop { exit }
}
end
end
end
h2. Subscribing to receive messages ("push API")
Each queue usually has one or more consumers (message handlers). Without it, queues are not very useful, right?
To subscribe to receive messages when they arrive to the queue ("start a queue consumer"), one uses {AMQP::Queue#subscribe} method.
Then when a message arrives, message header and body (aka payload) are passed to handling block:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
exchange = channel.fanout("amq.fanout")
channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
queue.bind(exchange).subscribe do |headers, payload|
puts "Received a message: #{payload.inspect}. Shutting down..."
connection.close {
EM.stop { exit }
}
end
EventMachine.add_timer(0.2) do
exchange.publish("Ohai!")
end
end
end
end
In books, articles and documentation about AMQP 0.9.1 you may come around discussions of _consumer tags_. Consumer tag in AMQP
parlance is an identifier for subscription: most often, it is used to unsubscribe from messages (more on that later in this chapter).
If you need to obtain consumer tag of a queue that is subscribed to receive messages, use {AMQP::Queue#consumer_tag}.
h3. Exclusive consumers
TBD
h2. Fetching messages when needed ("pull API")
AMQP 0.9.1 also provides a way for applications to fetch (pull) messages from the queue only when necessary. For that, use
{AMQP::Queue#pop}:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
exchange = channel.fanout("amq.fanout")
channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
queue.bind(exchange) do |_|
puts "Bound. Publishing a message..."
exchange.publish("Ohai!")
end
EventMachine.add_timer(0.5) do
queue.pop do |response|
puts "Fetched a message: #{response.inspect}. Shutting down..."
connection.close {
EM.stop { exit }
}
end
end
end
end
end
TBD
h2. Unsubscribing from messages
Sometimes it is necessary to unsubscribe from messages without deleting a queue. To do that, use {AMQP::Queue#unsubscribe} method:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
exchange = channel.fanout("amq.fanout")
channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
queue.bind(exchange).subscribe do |headers, payload|
puts "Received a new message"
end
EventMachine.add_timer(0.3) do
queue.unsubscribe
puts "Unsubscribed. Shutting down..."
connection.close {
EM.stop { exit }
}
end # EventMachine.add_timer
end # channel.queue
end
end
By default {AMQP::Queue#unsubscribe} uses :noack option to inform broker that there is no need to send a
confirmation. In other words, it does not expect you to pass in a callback, because consumer tag and registered
callbacks are cleared immediately.
h2. Unbinding queues from exchanges
To unbind queue from exchange, use {AMQP::Queue#unbind}:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
puts "Connected"
AMQP::Channel.new(connection) do |channel, open_ok|
puts "Opened a channel"
channel.on_error do |arg|
raise "Channel-level exception!"
end
exchange = channel.fanout("amq.fanout")
channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
queue.bind(exchange) do |_|
puts "Bound"
end
EventMachine.add_timer(0.5) do
queue.unbind(exchange) do |_|
puts "Unbound. Shutting down..."
connection.close {
EM.stop { exit }
}
end
end # EventMachine.add_timer
end # channel.queue
end
end
Note that unbinding an exchange queue was never bound to will result in an exception.
h2. Purging queues
It is possible to purge (remove all messages from) a queue using {AMQP::Queue#purge):
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
puts "Connected"
AMQP::Channel.new(connection) do |channel, open_ok|
puts "Opened a channel"
channel.on_error do |arg|
raise "Channel-level exception!"
end
exchange = channel.fanout("amq.fanout")
channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
queue.purge do |_|
puts "Queue now has no messages"
end
EventMachine.add_timer(0.5) do
connection.close {
EM.stop { exit }
}
end # EventMachine.add_timer
end # channel.queue
end
end
Callback is optional. However, remember that this operation takes some time.
h2. Deleting queues
To delete a queue, use {AMQP::Queue#delete}:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
puts "Connected"
AMQP::Channel.new(connection) do |channel, open_ok|
puts "Opened a channel"
channel.on_error do |arg|
raise "Channel-level exception!"
end
exchange = channel.fanout("amq.fanout")
channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
EventMachine.add_timer(0.5) do
queue.delete do
puts "Deleted a queue"
connection.close {
EM.stop { exit }
}
end
end # EventMachine.add_timer
end # channel.queue
end
end
Callback can be omitted. However, remember that this operation takes some time.
h2. Queue durability vs Message durability
See {file:docs/Durability.textile Durability guide}
h2. Error handling and recovery
TBD
h2. Vendor-specific extensions related to queues
TBD
h2. What to read next
TBD
h2. Tell us what you think!
Please take a moment and tell us what you think about this guide on "Ruby AMQP mailing list":http://groups.google.com/group/ruby-amqp:
what was unclear? what wasn't covered? maybe you don't like guide style or grammar and spelling are incorrect? Readers feedback is
key to making documentation better.
If mailing list communication is not an option for you for some reason, you can "contact guides author directly":mailto:michael@novemberain.com?subject=amqp%20gem%20documentation