h1. Getting started with AMQP Ruby gem
h2. About this guide
This guide is a quick tutorial that helps you to get started with AMQP 0.9.1 in general and amqp gem in particular.
It should take about 20 minutes to read and study provided code examples. This guide covers
* Installing RabbitMQ, a mature popular implementation of multiple versions of AMQP protocol.
* Installing amqp gem via "Rubygems":http://rubygems.org and "Bundler":http://gembundler.com.
* Running a "Hello, world"-like code example, a simple demonstration of 1:1 communication.
* Creating a "Twitter like" publish/subscribe example with 1 publisher and 4 subscribers, a case of 1:n communication.
* Creating a topic routing example with 2 publishers and 8 subscribers, a case of n:m communication when subscribers only receive messages they are interested in.
h2. Covered versions
This guide covers amqp gem v0.8.0 and later.
h2. Installing RabbitMQ
RabbitMQ site has a good "installation guide":http://www.rabbitmq.com/install.html that covers many operating systems.
On Mac OS X, the fastest way to install RabbitMQ is with Homebrew:
brew install rabbitmq
then run it:
rabbitmq-server
On "Debian and Ubuntu":http://www.rabbitmq.com/install.html#debian, you can either download a .deb package and install it with
dpkg or use apt repository RabbitMQ team provides. RabbitMQ package in even recent (10.10) versions of Ubuntu are old and won't
work with amqp gem 0.8.0 and later (we need at least version 2.0). For "RPM-based distributions":http://www.rabbitmq.com/install.html#rpm like RedHat
or CentOS RabbitMQ team provides an RPM package.
h2. Installing amqp gem
h3. Make sure you have Ruby installed
This guides assumes you have one of the supported Ruby implementations installed:
* Ruby 1.8.7
* Ruby 1.9.2
* JRuby (we recommend 1.6)
* Rubinius 1.2 or higher
* Ruby Enterprise Edition
h3. With Rubygems
To get amqp gem 0.8.0
gem install amqp --pre
h3. With Bundler
gem "amqp", :git => "git://github.com/ruby-amqp/amqp.git", :branch => "master"
h3. Verifying your installation
Lets verify your installation with this quick irb session:
irb -rubygems
:001 > require "amqp"
=> true
:002 > AMQP::VERSION
=> "0.8.0.rc2"
h2. A "Hello, world" example
Lets begin with a classic Hello, world example. First, here's the code:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."
channel = AMQP::Channel.new(connection)
queue = channel.queue("amqpgem.examples.hello_world", :auto_delete => true)
exchange = channel.direct("")
queue.subscribe do |payload|
puts "Received a message: #{payload}. Disconnecting..."
connection.close {
EM.stop { exit }
}
end
exchange.publish "Hello, world!", :routing_key => queue.name
end
This example demonstrates a very common communication scenario: app A wants to publish a message that will end up in
a queue that app B listens on. In this example, queue name is "amqpgem.examples.hello". Lets go through this example
step by step:
require "rubygems"
require "amqp"
is the simplest way to load amqp gem if you have installed it with RubyGems. The following piece of code
EventMachine.run do
# ...
end
runs what is called EventMachine reactor. Without paying much attention to what exactly does reactor mean in this case,
let us say that amqp gem is asynchronous and is based on an asynchronous network I/O library called "EventMachine":http://rubyeventmachine.com.
Next line
connection = AMQP.connect(:host => '127.0.0.1')
connects to the server running on localhost, with default port, username, password and virtual host.
channel = AMQP::Channel.new(connection)
opens the channel. AMQP is a multi-channeled protocol. Channels is a way to multiplex a TCP connection.
Because channels are open on a connection, AMQP::Channel constructor takes connection object as a parameter.
This line
queue = channel.queue("amqpgem.examples.helloworld", :auto_delete => true)
declares a queue on the channel we've just opened. Queues are where consumer applications get messages from.
We declare this queue with "auto-delete" parameter. Basically, that means "when there is no one left
consuming messages from this queue, delete it".
The next line,
exchange = channel.direct("")
instantiates an exchange. Exchange is where messages are sent by producers. Exchanges route messages to queues
according to rules called bindings. In this particular example, there are no explicitly defined bindings.
Exchange we defined is known as default exchange and it has implied binding to all queues. Before we get
into that, lets see how we define a handler for incoming messages:
queue.subscribe do |payload|
puts "Received a message: #{payload}. Disconnecting..."
connection.close {
EM.stop { exit }
}
end
{AMQP::Queue#subscribe} takes a block that will be called every time a message arrives. {AMQP::Session#close} closes
AMQP connection and runs a callback that stops EventMachine reactor.
Finally, we publish our message:
exchange.publish "Hello, world!", :routing_key => queue.name
Routing key is one of _message attributes_. Default exchange will route message to a queue that has the same name
as message's routing key. This is how our message ends up in amqpgem.examples.helloworld queue.
This first example can be modified to use method chaining technique:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
EventMachine.run do
AMQP.connect(:host => '127.0.0.1') do |connection|
puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."
channel = AMQP::Channel.new(connection)
channel.queue("amqpgem.examples.helloworld", :auto_delete => true).subscribe do |payload|
puts "Received a message: #{payload}. Disconnecting..."
connection.close {
EM.stop { exit }
}
end
channel.direct("").publish "Hello, world!", :routing_key => queue.name
end
end
With classes and methods introduced in this example, lets move on to a little bit more
sophisticated one.
h2. Babblr: one-to-many publish/subscribe example
Previous example demonstrated how connection to the broker is made and how to do 1:1 communication
using default exchange. Now lets take a look at another common scenario: broadcast, or multiple consumers
and one producer.
A very well know example of broadcast is Twitter: every time a person tweets, followers receive a notification.
Blabbr, our imaginary information network, models this scenario this way: every network member has a separate
queue and publishes blabs to a separate exchange. 3 Blabbr members, Joe, Aaron and Bob, follow official NBA
account on Blabbr to get updates about what is up in the world of basketball. Here is the code:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
AMQP.start("amqp://dev.rabbitmq.com:5672/") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.fanout("nba.scores")
channel.queue("joe", :auto_delete => true).bind(exchange).subscribe do |payload|
puts "#{payload} => joe"
end
channel.queue("aaron", :auto_delete => true).bind(exchange).subscribe do |payload|
puts "#{payload} => aaron"
end
channel.queue("bob", :auto_delete => true).bind(exchange).subscribe do |payload|
puts "#{payload} => bob"
end
exchange.publish("BOS 101, NYK 89").publish("ORL 85, ALT 88")
# disconnect & exit after 1 second
EventMachine.add_timer(1) do
exchange.delete
connection.close {
EM.stop { exit }
}
end
end
First line has a few difference from "Hello, world" example above:
* We use {AMQP.start} instead of {AMQP.connect}
* Instead of return values, we pass connection method a block and it yields connection
object back as soon as connection is established.
* Instead of passing connection parameters as a hash, we used a URI string.
{AMQP.start} is just a convenient way to do
EventMachine.run do
AMQP.connect(options) do |connection|
# ...
end
end
{AMQP.start} call blocks current thread so it's use is limited to scripts and small command
line applications. Blabbr is just that.
{AMQP.connect}, when invoked with a block, will yield connection object to it as soon as AMQP connection
is open. Finally, connection parameters maybe given as a Hash or as a connection string. {AMQP.connect}
method documentation has all the details.
Opening a channel in this example is no different from opening a channel in the example before that,
but exchange is instantiated differently:
exchange = channel.fanout("nba.scores")
Exchange we declare above using {AMQP::Channel#fanout} is a _fanout exchange_. Fanout exchanges deliver messages to every queue that
was bound to it: exactly what we want in case of Blabbr!
This piece of code
channel.queue("joe", :auto_delete => true).bind(exchange).subscribe do |payload|
puts "#{payload} => joe"
end
is similar to how we subscribed for message delivery before, but what does that {AMQP::Queue#bind}
method do? It sets up a _binding_ between the queue and an exchange you pass to it. We need to do this
to make sure that our fanout exchange routes messages to follower queues.
exchange.publish("BOS 101, NYK 89").publish("ORL 85, ALT 88")
demonstrates {AMQP::Exchange#publish} calls chaining. Because Blabbr members use fanout exchange
for publishing, there is no need to specify routing key: every queue that was bound to exchange receiving
a message will get it's own message copy, regardless of queue name and routing key used.
Next we use EventMachine's {http://eventmachine.rubyforge.org/EventMachine.html#M000466 add_timer} method to
run a piece of code in 1 second from now:
EventMachine.add_timer(1) do
exchange.delete
connection.close {
EM.stop { exit }
}
end
The code we want to run deletes exchange we declared earlier using {AMQP::Exchange#delete} and closes AMQP
connection with {AMQP::Session#close}. Finally, we stop EventMachine event loop and exit.
Blabbr is pretty unlikely to secure hundreds of millions in funding but it does a pretty good job of
demonstrating how one can use AMQP fanout exchanges to do broadcasting.
h2. Weathr: many-to-many topic routing example
So far we have seen point-to-point communication and broadcast. These two are possible with many protocols:
HTTP handles these scenarios just fine. What differentiates AMQP? Next is going to introduce you to topic
exchanges and routing with patterns, one of the features that makes AMQP very powerful.
Our third example is weather condition updates. What makes it different from the previous two is that
not all consumers are interested in all messages: people who live in Portland usually don't care about
weather in Hong Kong very much (unless they are going there soon). They are certainly interested in
weather conditions around Portland, possibly all of Oregon and sometimes a few neighbouring states.
Our example features multiple consumer applications monitoring updates for different regions. Some are
interested in updates for a specific city, others for a specific state and so on all the way up to continents.
Updates may overlap: an update for San Diego, CA _is_ an update for California, and should certainly show up
on North America updates list.
Here is the code:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
EventMachine.run do
AMQP.connect do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.topic("pub/sub", :auto_delete => true)
# Subscribers.
channel.queue("", :exclusive => true) do |queue|
queue.bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload|
puts "An update for North America: #{payload}, routing key is #{headers.routing_key}"
end
end
channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload|
puts "An update for South America: #{payload}, routing key is #{headers.routing_key}"
end
channel.queue("us.california").bind(exchange, :routing_key => "americas.north.us.ca.*").subscribe do |headers, payload|
puts "An update for US/California: #{payload}, routing key is #{headers.routing_key}"
end
channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload|
puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}"
end
channel.queue("it.rome").bind(exchange, :routing_key => "europe.italy.rome").subscribe do |headers, payload|
puts "An update for Rome, Italy: #{payload}, routing key is #{headers.routing_key}"
end
channel.queue("asia.hk").bind(exchange, :routing_key => "asia.southeast.hk.#").subscribe do |headers, payload|
puts "An update for Hong Kong: #{payload}, routing key is #{headers.routing_key}"
end
EM.add_timer(1) do
exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :routing_key => "europe.italy.roma").
publish("Paris update", :routing_key => "europe.france.paris")
end
show_stopper = Proc.new {
connection.close do
EM.stop
end
}
EM.add_timer(2, show_stopper)
end
end
First line that is different from Blabbr example is
exchange = channel.topic("pub/sub", :auto_delete => true)
We use a _topic exchange_ here. Topic exchanges are used for "multicast":http://en.wikipedia.org/wiki/Multicast messaging
where consumers indicate what topics they are interested in (think of it as of subscribing to a feed for individual tag
of your favourite blog as opposed to full feed). They do it by specifying _routing pattern_ on binding, for example:
channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload|
puts "An update for South America: #{payload}, routing key is #{headers.routing_key}"
end
Here we bind a queue with the name of "americas.south" to the topic exchange declared earlier using {AMQP::Queue#bind} method.
This means that only messages with routing key matching americas.south.# will be routed to that queue. Routing pattern consists of several words
separated by dots, similarly to URI path segments joined by slash. A few of examples:
* asia.southeast.thailand.bangkok
* sports.basketball
* usa.nasdaq.aapl
* tasks.search.indexing.accounts
Now lets take a look at a few routing keys that do match "americas.south.#" pattern:
* americas.south
* americas.south.*brazil*
* americas.south.*brazil.saopaolo*
* americas.south.*chile.santiago*
In other words, # part of the pattern matches 0 or more words. For "americas.south.*", some of matching routing keys are
* americas.south.*brazil*
* americas.south.*chile*
* americas.south.*peru*
but not
* americas.south
* americas.south.chile.santiago
so * matches a single word, whatever it is. AMQP 0.9.1 spec says that topic segments (words) may contain the letters A-Z and a-z
and digits 0-9.
One more thing that is different from previous examples is that the block we pass to {AMQP::Queue#subscribe} now takes two arguments:
header and body (aka payload). Long story short, the _header_ parameter lets you access metadata associated with the message. Some
examples of message metadata attributes are
* message content type
* message content encoding
* message priority
* message expiration time
* message identifier
* reply to, to what message this message is a reply to
* application id, identifier of application that produced the message
and so on.
As this binding demonstrates, # (and *) can appear in the beginning of routing patterns, too:
channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload|
puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}"
end
Publishing of messages is not different from previous examples. Running this example demonstrates that, for example,
message published with routing key of "americas.north.us.ca.berkeley" is routed to several queues: us.california and
_server-named queue_ we declared by passing blank string as the name:
channel.queue("", :exclusive => true) do |queue|
queue.bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload|
puts "An update for North America: #{payload}, routing key is #{headers.routing_key}"
end
end
Name of server-named queue is generated by the broker and sent back to the client with queue declaration confirmation.
Because of queue name is not known before reply arrives, we passed {AMQP::Channel#queue} a callback and it yielded us back
a queue object once confirmation has arrived.
h3. Avoid race conditions
A word of warning: you may find examples on the Web of {AMQP::Channel#queue} usage that do not use
callback: we *strongly recommend you always use a callback for server-named queues*. Otherwise your code may be a subject
to "race conditions":http://en.wikipedia.org/wiki/Race_condition and even though amqp gem tries to be reasonably smart and protect you from most common problems, there
is no way it can do so for every case. The only reason we support {AMQP::Channel#queue} usage w/o a callback for server-named queues is
backwards compatibility with earlier versions.
h2. Wrapping up
This tutorial ends here. Congratulations! You have learned quite a bit about both AMQP 0.9.1 and amqp gem.
h2. What to read next
Documentation is organized as a {file:docs/DocumentationGuidesIndex.textile Routing guide number of guides}, covering all kinds of
topics from {file:docs/Routing.textile routing} to {file:docs/ErrorHandling.textile error handling} to
{file:docs/VendorSpecificExchanges.textile Broker-specific AMQP 0.9.1 extensions}.
To learn more on what you have seen in this tutorial, check out
* {file:docs/ConnectingToTheBroker.textile Connection to the broker}
* {file:docs/Queues.textile Queues}
* {file:docs/Exchanges.textile Exchanges}
* {file:docs/Bindings.textile Bindings}
If you are migrating your application from earlier versions of amqp gem (0.6.x and 0.7.x), to 0.8.x and later, there is
{file:docs/08Migration.textile amqp gem 0.8 migration guide}.
h2. Tell us what you think!
Please take a moment and tell us what you think about this guide on "Ruby AMQP mailing list":http://groups.google.com/group/ruby-amqp:
what was unclear? what wasn't covered? maybe you don't like guide style or grammar and spelling are incorrect? Readers feedback is
key to making documentation better.
If mailing list communication is not an option for you for some reason, you can "contact guides author directly":mailto:michael@novemberain.com?subject=amqp%20gem%20documentation