h1. Getting started with AMQP Ruby gem h2. About this guide This guide is a quick tutorial that helps you to get started with AMQP 0.9.1 in general and amqp gem in particular. It should take about 20 minutes to read and study provided code examples. This guide covers * Installing RabbitMQ, a mature popular implementation of multiple versions of AMQP protocol. * Installing amqp gem via "Rubygems":http://rubygems.org and "Bundler":http://gembundler.com. * Running a "Hello, world"-like code example, a simple demonstration of 1:1 communication. * Creating a "Twitter like" publish/subscribe example with 1 publisher and 4 subscribers, a case of 1:n communication. * Creating a topic routing example with 2 publishers and 8 subscribers, a case of n:m communication when subscribers only receive messages they are interested in. h2. Covered versions This guide covers amqp gem v0.8.0 and later. h2. Installing RabbitMQ RabbitMQ site has a good "installation guide":http://www.rabbitmq.com/install.html that covers many operating systems. On Mac OS X, the fastest way to install RabbitMQ is with Homebrew: brew install rabbitmq then run it: rabbitmq-server On "Debian and Ubuntu":http://www.rabbitmq.com/install.html#debian, you can either download a .deb package and install it with dpkg or use apt repository RabbitMQ team provides. RabbitMQ package in even recent (10.10) versions of Ubuntu are old and won't work with amqp gem 0.8.0 and later (we need at least version 2.0). For "RPM-based distributions":http://www.rabbitmq.com/install.html#rpm like RedHat or CentOS RabbitMQ team provides an RPM package. h2. Installing amqp gem h3. Make sure you have Ruby installed This guides assumes you have one of the supported Ruby implementations installed: * Ruby 1.8.7 * Ruby 1.9.2 * JRuby (we recommend 1.6) * Rubinius 1.2 or higher * Ruby Enterprise Edition h3. With Rubygems To get amqp gem 0.8.0 gem install amqp --pre h3. With Bundler gem "amqp", :git => "git://github.com/ruby-amqp/amqp.git", :branch => "master" h3. Verifying your installation Lets verify your installation with this quick irb session: irb -rubygems :001 > require "amqp" => true :002 > AMQP::VERSION => "0.8.0.rc2" h2. A "Hello, world" example Lets begin with a classic Hello, world example. First, here's the code: #!/usr/bin/env ruby # encoding: utf-8 require "rubygems" require "amqp" EventMachine.run do connection = AMQP.connect(:host => '127.0.0.1') puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..." channel = AMQP::Channel.new(connection) queue = channel.queue("amqpgem.examples.hello_world", :auto_delete => true) exchange = channel.direct("") queue.subscribe do |payload| puts "Received a message: #{payload}. Disconnecting..." connection.close { EM.stop { exit } } end exchange.publish "Hello, world!", :routing_key => queue.name end This example demonstrates a very common communication scenario: app A wants to publish a message that will end up in a queue that app B listens on. In this example, queue name is "amqpgem.examples.hello". Lets go through this example step by step:

require "rubygems"
require "amqp"

is the simplest way to load amqp gem if you have installed it with RubyGems. The following piece of code

EventMachine.run do
  # ...
end

runs what is called EventMachine reactor. Without paying much attention to what exactly does reactor mean in this case, let us say that amqp gem is asynchronous and is based on an asynchronous network I/O library called "EventMachine":http://rubyeventmachine.com. Next line

connection = AMQP.connect(:host => '127.0.0.1')

connects to the server running on localhost, with default port, username, password and virtual host.

channel  = AMQP::Channel.new(connection)

opens the channel. AMQP is a multi-channeled protocol. Channels is a way to multiplex a TCP connection. Because channels are open on a connection, AMQP::Channel constructor takes connection object as a parameter. This line

queue    = channel.queue("amqpgem.examples.helloworld", :auto_delete => true)

declares a queue on the channel we've just opened. Queues are where consumer applications get messages from. We declare this queue with "auto-delete" parameter. Basically, that means "when there is no one left consuming messages from this queue, delete it". The next line,

exchange = channel.direct("")

instantiates an exchange. Exchange is where messages are sent by producers. Exchanges route messages to queues according to rules called bindings. In this particular example, there are no explicitly defined bindings. Exchange we defined is known as default exchange and it has implied binding to all queues. Before we get into that, lets see how we define a handler for incoming messages:

queue.subscribe do |payload|
  puts "Received a message: #{payload}. Disconnecting..."

  connection.close {
    EM.stop { exit }
  }
end

{AMQP::Queue#subscribe} takes a block that will be called every time a message arrives. {AMQP::Session#close} closes AMQP connection and runs a callback that stops EventMachine reactor. Finally, we publish our message:

exchange.publish "Hello, world!", :routing_key => queue.name

Routing key is one of _message attributes_. Default exchange will route message to a queue that has the same name as message's routing key. This is how our message ends up in amqpgem.examples.helloworld queue. This first example can be modified to use method chaining technique:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

EventMachine.run do
  AMQP.connect(:host => '127.0.0.1') do |connection|
    puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."

    channel  = AMQP::Channel.new(connection)

    channel.queue("amqpgem.examples.helloworld", :auto_delete => true).subscribe do |payload|
      puts "Received a message: #{payload}. Disconnecting..."

      connection.close {
        EM.stop { exit }
      }
    end

    channel.direct("").publish "Hello, world!", :routing_key => queue.name
  end
end

With classes and methods introduced in this example, lets move on to a little bit more sophisticated one. h2. Babblr: one-to-many publish/subscribe example Previous example demonstrated how connection to the broker is made and how to do 1:1 communication using default exchange. Now lets take a look at another common scenario: broadcast, or multiple consumers and one producer. A very well know example of broadcast is Twitter: every time a person tweets, followers receive a notification. Blabbr, our imaginary information network, models this scenario this way: every network member has a separate queue and publishes blabs to a separate exchange. 3 Blabbr members, Joe, Aaron and Bob, follow official NBA account on Blabbr to get updates about what is up in the world of basketball. Here is the code:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

AMQP.start("amqp://dev.rabbitmq.com:5672/") do |connection|
  channel  = AMQP::Channel.new(connection)
  exchange = channel.fanout("nba.scores")

  channel.queue("joe", :auto_delete => true).bind(exchange).subscribe do |payload|
    puts "#{payload} => joe"
  end

  channel.queue("aaron", :auto_delete => true).bind(exchange).subscribe do |payload|
    puts "#{payload} => aaron"
  end

  channel.queue("bob", :auto_delete => true).bind(exchange).subscribe do |payload|
    puts "#{payload} => bob"
  end

  exchange.publish("BOS 101, NYK 89").publish("ORL 85, ALT 88")

  # disconnect & exit after 1 second
  EventMachine.add_timer(1) do
    exchange.delete

    connection.close {
      EM.stop { exit }
    }
  end
end

First line has a few difference from "Hello, world" example above: * We use {AMQP.start} instead of {AMQP.connect} * Instead of return values, we pass connection method a block and it yields connection object back as soon as connection is established. * Instead of passing connection parameters as a hash, we used a URI string. {AMQP.start} is just a convenient way to do

EventMachine.run do
  AMQP.connect(options) do |connection|
    # ...
  end
end

{AMQP.start} call blocks current thread so it's use is limited to scripts and small command line applications. Blabbr is just that. {AMQP.connect}, when invoked with a block, will yield connection object to it as soon as AMQP connection is open. Finally, connection parameters maybe given as a Hash or as a connection string. {AMQP.connect} method documentation has all the details. Opening a channel in this example is no different from opening a channel in the example before that, but exchange is instantiated differently:

exchange = channel.fanout("nba.scores")

Exchange we declare above using {AMQP::Channel#fanout} is a _fanout exchange_. Fanout exchanges deliver messages to every queue that was bound to it: exactly what we want in case of Blabbr! This piece of code

channel.queue("joe", :auto_delete => true).bind(exchange).subscribe do |payload|
  puts "#{payload} => joe"
end

is similar to how we subscribed for message delivery before, but what does that {AMQP::Queue#bind} method do? It sets up a _binding_ between the queue and an exchange you pass to it. We need to do this to make sure that our fanout exchange routes messages to follower queues.

exchange.publish("BOS 101, NYK 89").publish("ORL 85, ALT 88")

demonstrates {AMQP::Exchange#publish} calls chaining. Because Blabbr members use fanout exchange for publishing, there is no need to specify routing key: every queue that was bound to exchange receiving a message will get it's own message copy, regardless of queue name and routing key used. Next we use EventMachine's {http://eventmachine.rubyforge.org/EventMachine.html#M000466 add_timer} method to run a piece of code in 1 second from now:

EventMachine.add_timer(1) do
  exchange.delete

  connection.close {
    EM.stop { exit }
  }
end

The code we want to run deletes exchange we declared earlier using {AMQP::Exchange#delete} and closes AMQP connection with {AMQP::Session#close}. Finally, we stop EventMachine event loop and exit. Blabbr is pretty unlikely to secure hundreds of millions in funding but it does a pretty good job of demonstrating how one can use AMQP fanout exchanges to do broadcasting. h2. Weathr: many-to-many topic routing example So far we have seen point-to-point communication and broadcast. These two are possible with many protocols: HTTP handles these scenarios just fine. What differentiates AMQP? Next is going to introduce you to topic exchanges and routing with patterns, one of the features that makes AMQP very powerful. Our third example is weather condition updates. What makes it different from the previous two is that not all consumers are interested in all messages: people who live in Portland usually don't care about weather in Hong Kong very much (unless they are going there soon). They are certainly interested in weather conditions around Portland, possibly all of Oregon and sometimes a few neighbouring states. Our example features multiple consumer applications monitoring updates for different regions. Some are interested in updates for a specific city, others for a specific state and so on all the way up to continents. Updates may overlap: an update for San Diego, CA _is_ an update for California, and should certainly show up on North America updates list. Here is the code:

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "amqp"

EventMachine.run do
  AMQP.connect do |connection|
    channel  = AMQP::Channel.new(connection)
    exchange = channel.topic("pub/sub", :auto_delete => true)

    # Subscribers.
    channel.queue("", :exclusive => true) do |queue|
      queue.bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload|
        puts "An update for North America: #{payload}, routing key is #{headers.routing_key}"
      end
    end
    channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload|
      puts "An update for South America: #{payload}, routing key is #{headers.routing_key}"
    end
    channel.queue("us.california").bind(exchange, :routing_key => "americas.north.us.ca.*").subscribe do |headers, payload|
      puts "An update for US/California: #{payload}, routing key is #{headers.routing_key}"
    end
    channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload|
      puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}"
    end
    channel.queue("it.rome").bind(exchange, :routing_key => "europe.italy.rome").subscribe do |headers, payload|
      puts "An update for Rome, Italy: #{payload}, routing key is #{headers.routing_key}"
    end
    channel.queue("asia.hk").bind(exchange, :routing_key => "asia.southeast.hk.#").subscribe do |headers, payload|
      puts "An update for Hong Kong: #{payload}, routing key is #{headers.routing_key}"
    end

    EM.add_timer(1) do
      exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
        publish("Berkeley update",         :routing_key => "americas.north.us.ca.berkeley").
        publish("San Francisco update",    :routing_key => "americas.north.us.ca.sanfrancisco").
        publish("New York update",         :routing_key => "americas.north.us.ny.newyork").
        publish("São Paolo update",        :routing_key => "americas.south.brazil.saopaolo").
        publish("Hong Kong update",        :routing_key => "asia.southeast.hk.hongkong").
        publish("Kyoto update",            :routing_key => "asia.southeast.japan.kyoto").
        publish("Shanghai update",         :routing_key => "asia.southeast.prc.shanghai").
        publish("Rome update",             :routing_key => "europe.italy.roma").
        publish("Paris update",            :routing_key => "europe.france.paris")
    end


    show_stopper = Proc.new {
      connection.close do
        EM.stop
      end
    }

    EM.add_timer(2, show_stopper)
  end
end

First line that is different from Blabbr example is

exchange = channel.topic("pub/sub", :auto_delete => true)

We use a _topic exchange_ here. Topic exchanges are used for "multicast":http://en.wikipedia.org/wiki/Multicast messaging where consumers indicate what topics they are interested in (think of it as of subscribing to a feed for individual tag of your favourite blog as opposed to full feed). They do it by specifying _routing pattern_ on binding, for example:

channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload|
  puts "An update for South America: #{payload}, routing key is #{headers.routing_key}"
end

Here we bind a queue with the name of "americas.south" to the topic exchange declared earlier using {AMQP::Queue#bind} method. This means that only messages with routing key matching americas.south.# will be routed to that queue. Routing pattern consists of several words separated by dots, similarly to URI path segments joined by slash. A few of examples: * asia.southeast.thailand.bangkok * sports.basketball * usa.nasdaq.aapl * tasks.search.indexing.accounts Now lets take a look at a few routing keys that do match "americas.south.#" pattern: * americas.south * americas.south.*brazil* * americas.south.*brazil.saopaolo* * americas.south.*chile.santiago* In other words, # part of the pattern matches 0 or more words. For "americas.south.*", some of matching routing keys are * americas.south.*brazil* * americas.south.*chile* * americas.south.*peru* but not * americas.south * americas.south.chile.santiago so * matches a single word, whatever it is. AMQP 0.9.1 spec says that topic segments (words) may contain the letters A-Z and a-z and digits 0-9. One more thing that is different from previous examples is that the block we pass to {AMQP::Queue#subscribe} now takes two arguments: header and body (aka payload). Long story short, the _header_ parameter lets you access metadata associated with the message. Some examples of message metadata attributes are * message content type * message content encoding * message priority * message expiration time * message identifier * reply to, to what message this message is a reply to * application id, identifier of application that produced the message and so on. As this binding demonstrates, # (and *) can appear in the beginning of routing patterns, too:

channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload|
  puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}"
end

Publishing of messages is not different from previous examples. Running this example demonstrates that, for example, message published with routing key of "americas.north.us.ca.berkeley" is routed to several queues: us.california and _server-named queue_ we declared by passing blank string as the name:

channel.queue("", :exclusive => true) do |queue|
  queue.bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload|
    puts "An update for North America: #{payload}, routing key is #{headers.routing_key}"
  end
end

Name of server-named queue is generated by the broker and sent back to the client with queue declaration confirmation. Because of queue name is not known before reply arrives, we passed {AMQP::Channel#queue} a callback and it yielded us back a queue object once confirmation has arrived. h3. Avoid race conditions A word of warning: you may find examples on the Web of {AMQP::Channel#queue} usage that do not use callback: we *strongly recommend you always use a callback for server-named queues*. Otherwise your code may be a subject to "race conditions":http://en.wikipedia.org/wiki/Race_condition and even though amqp gem tries to be reasonably smart and protect you from most common problems, there is no way it can do so for every case. The only reason we support {AMQP::Channel#queue} usage w/o a callback for server-named queues is backwards compatibility with earlier versions. h2. Wrapping up This tutorial ends here. Congratulations! You have learned quite a bit about both AMQP 0.9.1 and amqp gem. h2. What to read next Documentation is organized as a {file:docs/DocumentationGuidesIndex.textile Routing guide number of guides}, covering all kinds of topics from {file:docs/Routing.textile routing} to {file:docs/ErrorHandling.textile error handling} to {file:docs/VendorSpecificExchanges.textile Broker-specific AMQP 0.9.1 extensions}. To learn more on what you have seen in this tutorial, check out * {file:docs/ConnectingToTheBroker.textile Connection to the broker} * {file:docs/Queues.textile Queues} * {file:docs/Exchanges.textile Exchanges} * {file:docs/Bindings.textile Bindings} If you are migrating your application from earlier versions of amqp gem (0.6.x and 0.7.x), to 0.8.x and later, there is {file:docs/08Migration.textile amqp gem 0.8 migration guide}. h2. Tell us what you think! Please take a moment and tell us what you think about this guide on "Ruby AMQP mailing list":http://groups.google.com/group/ruby-amqp: what was unclear? what wasn't covered? maybe you don't like guide style or grammar and spelling are incorrect? Readers feedback is key to making documentation better. If mailing list communication is not an option for you for some reason, you can "contact guides author directly":mailto:michael@novemberain.com?subject=amqp%20gem%20documentation