h1. AMQP queues in detail h2. About this guide This guide covers everything related to queues in AMQP 0.9.1, common usage scenarios and how to accomplish typical operations using amqp gem. h2. Covered versions This guide covers amqp gem v0.8.0 and later. h2. Queues in AMQP 0.9.1, briefly h3. What are AMQP queues? Queues store and forward messages to consumers. They are similar to mailboxes in SMTP. Messages flow from producing applications to {file:docs/Exchanges.textile exchanges} that route them to queues and finally queues deliver them to consumer applications (or consumer applications fetch messages as needed). Note that unlike some other messaging protocols/systems, messages are not delivered directly to queues. They are delivered to exchanges that route messages to queues using rules knows as *bindings*. AMQP is a programmable protocol, so queues and bindings alike are declared by applications. h3. Concept of bindings Binding is an association between a queue and an exchange. Queues must be bound to at least one exchange in order to receive messages from publishers. Learn more about bindings in {file:docs/Bindings.textile Bindings guide}. h3. Attributes Queues have several attributes associated with them: * Name * Exclusivity * Whether queue is auto-deleted when no longer used * Other metadata (aka X-arguments) These attributes define how queues can be used, what their lifecycle is like and other aspects of queue behavior. amqp gem represents queues as instances of {AMQP::Queue}. h2. Queue names. Server-named queues. Predefined queues. Every queue has a name that identifies it. Queue names often contain several segments separated by a dot (.), similarly to how URI path segments are separated by a slash (/), although it may be almost any string, with some limitations (see below). Applications may pick queue names or ask broker to generate a name for them. To do so, pass *empty string* as queue name argument. Here is an example:

# Declaring a server-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    AMQP::Queue.new(channel, "", :auto_delete => true) do |queue, declare_ok|
      puts "#{queue.name} is ready to go. AMQP method: #{declare_ok.inspect}"

      connection.close {
        EventMachine.stop { exit }
      }
    end
  end
end

If you want to declare a queue with a particular name, for example, "images.resize", pass it to Queue class constructor:

# Declaring a server-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    AMQP::Queue.new(channel, "images.resize", :auto_delete => true) do |queue, declare_ok|
      puts "#{queue.name} is ready to go."

      connection.close {
        EventMachine.stop { exit }
      }
    end
  end
end

Queue names starting with 'amq.' are reserved for internal use by the broker. Attempts to declare queue with a name that violates this rule will result in AMQP::IncompatibleOptionsError to be thrown (when queue is re-declared on the same channel object) or channel-level exception (when originally queue was declared on one channel and re-declaration with different attributes happens on another channel). Learn more in Error handling and recovery section below. h2. Common usage scenarios h2. Queue life-cycles. When use of server-named queues is optimal and when it isn't. To quote AMQP 0.9.1 spec, there are two common message queue life-cycles: * Durable message queues that are shared by many consumers and have an independent existence: i.e. they will continue to exist and collect messages whether or not there are consumers to receive them. * Temporary message queues that are private to one consumer and are tied to that consumer. When the consumer disconnects, the message queue is deleted. There are some variations on these, such as shared message queues that are deleted when the last of many consumers disconnects. One example of durable message queues is well-known services like event collectors (event loggers). They are usually up whether there are services to log anything or not. Other applications know what queues they use and can rely on those queues being around all the time, survive broker restarts and in general be available should an application in the network need to use them. In this case, explicitly named durable queues are optimal and coupling it creates between applications is not an issue. Another scenario of a well-known long-lived service is distributed metadata/directory/locking server like Apache Zookeeper, Google's Chubby or DNS. Services like this benefit from using well-known, not generated queue names, and so do other applications that use them. Different scenario is in "a cloud settings" when some kind of workers/instances may come online and go down basically any time and other applications cannot rely on them being available. Using well-known queue names in this case is possible but server-generated, short-lived queues that are bound to topic or fanout exchanges to receive relevant messages is a better idea. Imagine a service that processes an endless stream of events (Twitter is one example). When traffic goes up, development operations may spin up additional applications instances in the cloud to handle the load. Those new instances want to subscribe to receive messages to process but the rest of the system doesn't know anything about them, rely on them being online or try to address them directly: they process events from a shared stream and are not different from their peers. In a case like this, there is no reason for message consumers to not use queue names generated by the broker. In general, use of explicitly named or server-named queues depends on messaging pattern your application needs. {http://www.eaipatterns.com/ Enterprise Integration Patters} discusses many messaging patterns in depth. RabbitMQ FAQ also has a section on {http://www.rabbitmq.com/faq.html#scenarios use cases}. h2. Declaring a durable shared queue To declare a durable shared queue, you pass queue name that is a non-blank string and use :durable option:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

# Declaring a client-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    AMQP::Queue.new(channel, "images.resize", :durable => true) do |queue, declare_ok|
      puts "#{queue.name} is ready to go."

      connection.close {
        EventMachine.stop { exit }
      }
    end
  end
end

the same piece of code that uses {AMQP::Channel#queue} for convenience:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

# Declaring a client-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    channel.queue("images.resize", :durable => true) do |queue, declare_ok|
      puts "#{queue.name} is ready to go."

      connection.close {
        EventMachine.stop { exit }
      }
    end
  end
end

h2. Declaring a temporary exclusive queue To declare a server-named, exclusive, auto-deleted queue, pass "" (empty string) as queue name and use :exclusive and :auto_delete options:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

# Declaring a server-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    AMQP::Queue.new(channel, "", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
      puts "#{queue.name} is ready to go."

      connection.close {
        EventMachine.stop { exit }
      }
    end
  end
end

the same piece of code that uses {AMQP::Channel#queue} for convenience:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

# Declaring a server-named queue using AMQP::Queue constructor
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
      puts "#{queue.name} is ready to go."

      connection.close {
        EventMachine.stop { exit }
      }
    end
  end
end

h2. Binding queues to exchanges In order to receive messages, a queue needs to be bound to at least one exchange. Most of the time binding is explcit (done by applications). To bind a queue to an exchange, use {AMQP::Queue#bind). Argument can be either an {AMQP::Exchange} instance or exchange name:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

# Binding a queue to an exchange
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    exchange = channel.fanout("amq.fanout")

    channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
      queue.bind(exchange) do |bind_ok|
        puts "Just bound #{queue.name} to #{exchange.name}"
      end

      connection.close {
        EventMachine.stop { exit }
      }
    end
  end
end


#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

# Binding a queue to an exchange
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    exchange_name = "amq.fanout"

    channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
      queue.bind(exchange_name) do |bind_ok|
        puts "Just bound #{queue.name} to #{exchange_name}"
      end

      connection.close {
        EventMachine.stop { exit }
      }
    end
  end
end

h2. Subscribing to receive messages ("push API") Each queue usually has one or more consumers (message handlers). Without it, queues are not very useful, right? To subscribe to receive messages when they arrive to the queue ("start a queue consumer"), one uses {AMQP::Queue#subscribe} method. Then when a message arrives, message header and body (aka payload) are passed to handling block:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    exchange = channel.fanout("amq.fanout")

    channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
      queue.bind(exchange).subscribe do |headers, payload|
        puts "Received a message: #{payload.inspect}. Shutting down..."

        connection.close {
          EM.stop { exit }
        }
      end

      EventMachine.add_timer(0.2) do
        exchange.publish("Ohai!")
      end
    end
  end
end

h3. Exclusive consumers TBD h2. Fetching messages when needed ("pull API") AMQP 0.9.1 also provides a way for applications to fetch (pull) messages from the queue only when necessary. For that, use {AMQP::Queue#pop}:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    exchange = channel.fanout("amq.fanout")

    channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
      queue.bind(exchange) do |_|
        puts "Bound. Publishing a message..."
        exchange.publish("Ohai!")
      end

      EventMachine.add_timer(0.5) do
        queue.pop do |response|
          puts "Fetched a message: #{response.inspect}. Shutting down..."

          connection.close {
            EM.stop { exit }
          }
        end
      end
    end
  end
end

TBD h2. Unsubscribing from messages TBD h2. Unbinding queues from exchanges To unbind queue from exchange, use {AMQP::Queue#unbind}:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  puts "Connected"
  AMQP::Channel.new(connection) do |channel, open_ok|
    puts "Opened a channel"
    channel.on_error do |arg|
      raise "Channel-level exception!"
    end
    exchange = channel.fanout("amq.fanout")

    channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
      queue.bind(exchange) do |_|
        puts "Bound"
      end

      EventMachine.add_timer(0.5) do
        queue.unbind(exchange) do |_|
          puts "Unbound. Shutting down..."

          connection.close {
            EM.stop { exit }
          }
        end
      end # EventMachine.add_timer
    end # channel.queue
  end
end

Note that unbinding an exchange queue was never bound to will result in an exception. h2. Purging queues It is possible to purge (remove all messages from) a queue using {AMQP::Queue#purge):

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  puts "Connected"
  AMQP::Channel.new(connection) do |channel, open_ok|
    puts "Opened a channel"
    channel.on_error do |arg|
      raise "Channel-level exception!"
    end
    exchange = channel.fanout("amq.fanout")

    channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
      queue.purge do |_|
        puts "Queue now has no messages"
      end

      EventMachine.add_timer(0.5) do
        connection.close {
          EM.stop { exit }
        }
      end # EventMachine.add_timer
    end # channel.queue
  end
end

Callback is optional. However, remember that this operation takes some time. h2. Deleting queues To delete a queue, use {AMQP::Queue#delete}:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672/") do |connection, open_ok|
  puts "Connected"
  AMQP::Channel.new(connection) do |channel, open_ok|
    puts "Opened a channel"
    channel.on_error do |arg|
      raise "Channel-level exception!"
    end
    exchange = channel.fanout("amq.fanout")

    channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
      EventMachine.add_timer(0.5) do
        queue.delete do
          puts "Deleted a queue"
          connection.close {
            EM.stop { exit }
          }
        end
      end # EventMachine.add_timer
    end # channel.queue
  end
end

Callback can be omitted. However, remember that this operation takes some time. h2. Queue durability vs Message durability TBD h2. Error handling and recovery TBD h2. Vendor-specific extensions related to queues TBD h2. What to read next TBD h2. Tell us what you think! Please take a moment and tell us what you think about this guide on "Ruby AMQP mailing list":http://groups.google.com/group/ruby-amqp: what was unclear? what wasn't covered? maybe you don't like guide style or grammar and spelling are incorrect? Readers feedback is key to making documentation better. If mailing list communication is not an option for you for some reason, you can "contact guides author directly":mailto:michael@novemberain.com?subject=amqp%20gem%20documentation