# @title Ruby amqp gem: Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra h1. Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra h2. About this guide This guide covers connection to AMQP broker from standalone and Web applications, connection error handling, authentication failure handling and related issues. h2. Covered versions This guide covers "Ruby amqp gem":http://github.com/ruby-amqp/amqp v0.8.0 and later. h2. Terminology In this guide we define standalone application as application that does not run on a Web server like Unicorn or Passenger. The key difference is that these applications control main Ruby VM thread and often use it to run EventMachine event loop. When amqp gem is used inside of a Web applications, main thread is occupied by Web application server and code required to establish connection to AMQP broker needs to be a little bit different. h2. Two ways to specify connection parameters Connection parameters (host, port, username, vhost and so on) can be passed in two forms: * As a hash * As a connection URI string (à la JDBC) h3. Using a hash Hash options amqp gem will recognize are * :host * :port * :username (aliased as :user) * :password (aliased as :pass) * :vhost * :ssl * :timeout * :frame_max h4. Default parameters Default connection parameters are
{
:host => "127.0.0.1",
:port => 5672,
:user => "guest",
:pass => "guest",
:vhost => "/",
:ssl => false,
:frame_max => 131072
}
h3. Using connection strings
It is convenient to be able to specify the AMQP connection
parameters as a URI string, and various "amqp" URI schemes
exist. Unfortunately, there is no standard for these URIs, so
while the schemes share the basic idea, they differ in some
details. This implementation aims to encourage URIs that work
as widely as possible.
Here are some examples:
* amqp://dev.rabbitmq.com
* amqp://dev.rabbitmq.com:5672
* amqp://guest:guest@dev.rabbitmq.com:5672
* amqp://hedgehog:t0ps3kr3t@hub.megacorp.internal/production
* amqps://hub.megacorp.internal/%2Fvault
The URI scheme should be "amqp", or "amqps" if SSL is required.
The host, port, username and password are represented in the
authority component of the URI in the same way as in http URIs.
The vhost is obtained from the first segment of the path, with the
leading slash removed. The path should contain only a single segment
(i.e, the only slash in it should be the leading one). If the vhost
is to include slashes or other reserved URI characters, these should
be percent-escaped.
Here are some examples that demonstrate how
{AMQP::Client.parse_connection_uri} parses out the vhost from
connection URIs:
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com") # => vhost is nil, so default (/) will be used
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/") # => vhost is an empty string
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/%2Fvault") # => vhost is /vault
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/production") # => vhost is production
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/a.b.c") # => vhost is a.b.c
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/foo/bar") # => ArgumentError
h2. Starting event loop & connecting in standalone applications
h3. EventMachine event loop
amqp gem uses "EventMachine":http://rubyeventmachine.com under the hood and needs EventMachine
event loop to be running in order to connect to AMQP broker or send any data. This means that
before connecting to AMQP broker, we need to _start EventMachine reactor_ (get the event loop
going). Here is how to do it:
require "amqp"
EventMachine.run do
# ...
end
"EventMachine.run":http://eventmachine.rubyforge.org/EventMachine.html#M000461 will block current thread until event loop is stopped.
Standalone applications often can afford starting event loop on the main thread. If you have no experience with threading, this is a
recommended way.
h3. Using AMQP.connect with a block
Once event loop is running, {AMQP.connect} method will attempt to connect to the broker. It can be used in two ways. Here is the
first one:
require "amqp"
EventMachine.run do
# using AMQP.connect with a block
AMQP.connect(:host => "localhost") do |client|
# connection is open and ready to be used
end
end
{AMQP.connect} takes a block that will be executed as soon as AMQP connection is open (TCP connection was set up,
authentication succeeded, broker and client finished negotiating connection parameters like max frame size).
h3. Using AMQP.connect without a callback
Alternative way of connecting is this:
require "amqp"
EventMachine.run do
# using AMQP.connect with a block
client = AMQP.connect(:host => "hub.megacorp.internal", :username => "hedgehog", :password => "t0ps3kr3t")
# connection is not yet open, however, amqp gem will delay
# channel operations until after connection is open. However,
# amqp gem cannot solve every possible race condition so be careful
end
If you do not need to assign returned value to a variable, "block version" is recommended because it eliminates issues that may
arise from attempts to use a connection object that is not fully opened yet. For example, handling of authentication failures is simpler
with the block version, as we will see in the following sections.
h3. Using AMQP.start
EventMachine.run and {AMQP.connect} with a block is such a common combination that amqp gem provides a shortcut:
require "amqp"
AMQP.start("amqp://dev.rabbitmq.com:5672") do |client|
# connection is open and ready to be used
end
As these examples demonstrate, {AMQP.connect} and {AMQP.start} accept either a Hash of connection options or a connection URI string.
See reference documentation for each method to learn all the options they accept and what the default values are.
h3. On Thread#sleep use
When not passing a block to {AMQP.connect}, it is tempting to "give connection some time to get through" by using Thread#sleep. Unless you are
running event loop in a separate thread, don't do this. Thread#sleep blocks current thread so if event loop is running the very same current thread,
blocking it _will also block the event loop_. *When event loop is blocked, no data is sent or received, so connection does not proceed.*
h3. Detecting TCP connection failures
When applications connect to the broker, they need to handle connection failures. Networks are not 100% reliable, even with modern system configuration tools
like Chef or Puppet misconfigurations happen and broker might be down, too. Error detection should happen as early as possible. There are two ways of detecting
TCP connection failure, the first one is to catch an exception:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> TCP connection failure handling with a rescue statement"
puts
connection_settings = {
:port => 9689,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password",
:timeout => 0.3
}
begin
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
rescue AMQP::TCPConnectionFailed => e
puts "Caught AMQP::TCPConnectionFailed => TCP connection failed, as expected."
end
{AMQP.connect} (and {AMQP.start}) will raise {AMQP::TCPConnectionFailed} if connection fails. Code that catches it can write to log
about the issue or use retry to execute begin block one more time. Because initial connection failures are due to misconfiguration or network outage, reconnection
to the same endpoint (hostname, port, vhost combination) will result in the same issue over and over. TBD: failover, connection to the cluster.
Alternative way of handling connection failure is with an errback (a callback for specific kind of error):
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> TCP connection failure handling with a callback"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 9689,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password",
:timeout => 0.3,
:on_tcp_connection_failure => handler
}
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
:on_tcp_connection_failure option accepts any object that responds to #call.
If you connect to the broker from a code in a class (as opposed to top-level scope in a script), Object#method can be used to pass object method as a handler
instead of a Proc.
TBD: provide an example
h3. Detecting authentication failures
Another reason why connection may fail is authentication failure. Handling authentication failure is very similar to handling initial TCP
connection failure:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> Authentication failure handling with a callback"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 5672,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password_that_is_incorrect #{Time.now.to_i}",
:timeout => 0.3,
:on_tcp_connection_failure => handler,
:on_possible_authentication_failure => Proc.new { |settings|
puts "Authentication failed, as expected, settings are: #{settings.inspect}"
EM.stop
}
}
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
default handler raises {AMQP::PossibleAuthenticationFailureError}:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> Authentication failure handling with a rescue block"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 5672,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password_that_is_incorrect #{Time.now.to_i}",
:timeout => 0.3,
:on_tcp_connection_failure => handler
}
begin
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
rescue AMQP::PossibleAuthenticationFailureError => afe
puts "Authentication failed, as expected, caught #{afe.inspect}"
EventMachine.stop if EventMachine.reactor_running?
end
In case you wonder why callback name has "possible" in it: {http://bit.ly/mTr1YN AMQP 0.9.1 spec} requires broker implementations to
simply close TCP connection without sending any more data when an exception (such as authentication failure) occurs before AMQP connection
is open. In practice, however, when broker closes TCP connection between successful TCP connection and before AMQP connection is open,
it means that authentication has failed.
h2. Starting event loop & connecting in Web applications (Ruby on Rails, Sinatra, Merb, Rack)
Web applications are different from standalone applications in that main thread is occupied by Web/application server like Unicorn
or Thin, so you need to start EventMachine reactor before you attempt to use {AMQP.connect}.
In a Ruby on Rails app, probably the best place for this is in initializer (like config/initializers/amqp.rb). For Merb apps, it is config/init.rb.
For Sinatra and pure Rack applications, place it next to other configuration code.
Next we are going to discuss issues specific to particular Web servers.
h3. Using Ruby amqp gem with Unicorn
h4. Unicorn is a pre-forking server
"Unicorn":http://unicorn.bogomips.org is a pre-forking server. That means it forks worker processes that serve HTTP requests. The "fork(2)":http://en.wikipedia.org/wiki/Fork_(operating_system) system call
has several gotchas associated with it, two of which affect EventMachine and "Ruby amqp gem":http://github.com/ruby-amqp/amqp:
* Unintentional file descriptor sharing
* The fact that "forked child process only inherits one thread":http://bit.ly/fork-and-threads (and EventMachine thread thus is not inherited)
To avoid both problems, start EventMachine reactor and AMQP connection *after* master process forks workers. Master Unicorn process never serves HTTP requests and usually
doesn't need to hold an AMQP connection. Next lets see how to spin up EventMachine reactor and connect to the broker after Unicorn forks a worker.
h4. Starting EventMachine reactor and connecting to the broker after Unicorn forks worker processes
Unicorn lets you specify a configuration file to use. In that file, you define a callback Unicorn runs after it forks worker process(es):
ENV["FORKING"] = "true"
listen 3000
worker_processes 1
timeout 30
preload_app true
after_fork do |server, worker|
require "amqp"
# the following is *required* for Rails + "preload_app true",
defined?(ActiveRecord::Base) and
ActiveRecord::Base.establish_connection
t = Thread.new { AMQP.start }
sleep(1.0)
EventMachine.next_tick do
AMQP.channel ||= AMQP::Channel.new(AMQP.connection)
AMQP.channel.queue("amqpgem.examples.rails23.warmup", :durable => true)
3.times do |i|
puts "[after_fork/amqp] Publishing a warmup message ##{i}"
AMQP.channel.default_exchange.publish("A warmup message #{i} from #{Time.now.strftime('%H:%M:%S %m/%b/%Y')}", :routing_key => "amqpgem.examples.rails23.warmup")
end
end
end
In the example above we start EventMachine reactor in a separate thread, block current thread for 1 second to let event loop spin up and then
connect to AMQP broker on the next event loop tick. Publishing several warmup messages on boot is a good idea because it
lets you detect issues that forking may cause earlier.
Note that configuration file can easily be used in development environments: other than the fact that Unicorn runs in the foreground,
it gives you exactly the same application boot behavior as in QA and production environments, which is a good thing.
An "example Ruby on Rails application that uses Ruby amqp gem and Unicorn":http://bit.ly/ruby-amqp-gem-example-with-ruby-on-rails-and-unicorn is available on GitHub.
h3. Using Ruby amqp gem with Passenger
Phusion Passenger is a pre-forking server, much like Unicorn. Just like with Unicorn, EventMachine reactor and AMQP connection should be started *after* it forks worker
processes. Passenger documentation has "a section":http://bit.ly/passenger-forking-gotchas that explains how to avoid problems related to the behavior of fork(2) system
call, namely:
* Unintentional file descriptor sharing
* The fact that "forked child process only inherits one thread":http://bit.ly/fork-and-threads (and EventMachine thread thus is not inherited)
TBD: if you are a Passenger user, please help us write the rest of this section!
h3. Using Ruby amqp gem with Thin and Goliath
h4. Thin and Goliath start EventMachine reactor for you, but there is a little nuance
If you use "Thin":http://code.macournoyer.com/thin/ or "Goliath":https://github.com/postrank-labs/goliath/, you are all set: those two servers use EventMachine under the hood.
There is no need to start EventMachine reactor. However, depending on app server, it's version, version of the framework and Rack middleware being used,
EventMachine reactor start may be slightly delayed. To not depend on this factor, use EventMachine.next_tick to delay connection until after reactor is actually running:
EventMachine.next_tick { AMQP.connect(...) }
So in case EventMachine reactor isn't running yet on server/application boot, connection won't fail but instead wait for reactor to start.
Thin and Goliath are not pre-forking servers so there is no need to re-establish connection the way you do it with Unicorn and Passenger.
h2. If it just doesn't work: troubleshooting
If you read this guide yet your issue is still unresolved, check our {file:docs/Troubleshooting.textile Troubleshooting guide} before asking on the mailing list
h2. What to read next
* {file:docs/Queues.textile Queues}
* {file:docs/ErrorHandling.textile Error handling}
* {file:docs/ConnectionEncryptionWithTLS.textile Using TLS (SSL)} (if you want to use SSL encrypted connection to the broker)
h2. Tell us what you think!
Please take a moment and tell us what you think about this guide on "Ruby AMQP mailing list":http://groups.google.com/group/ruby-amqp:
what was unclear? what wasn't covered? maybe you don't like guide style or grammar and spelling are incorrect? Readers feedback is
key to making documentation better.
If mailing list communication is not an option for you for some reason, you can "contact guides author directly":mailto:michael@novemberain.com?subject=amqp%20gem%20documentation