# @title Ruby amqp gem: Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra h1. Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra h2. About this guide This guide covers connection to an AMQP broker from standalone and Web applications, connection error handling, authentication failure handling and related issues. h2. Which versions of the amqp gem does this guide cover? This guide covers v0.8.0 and later of the "Ruby amqp gem":http://github.com/ruby-amqp/amqp. h2. Terminology In this guide we define a standalone application as an application that does not run on a Web server like Unicorn or Passenger. The key difference is that these applications control the main Ruby VM thread and often use it to run the EventMachine event loop. When the amqp gem is used in a Web application, the main thread is occupied by the Web application server and the code required to establish a connection to an AMQP broker needs to be a little bit different. h2. Two ways to specify connection parameters Connection parameters (host, port, username, vhost and so on) can be passed in two forms: * As a hash * As a connection URI string (à la JDBC) h3. Using a hash Hash options that the amqp gem will recognize are * :host * :port * :username (aliased as :user) * :password (aliased as :pass) * :vhost * :ssl * :timeout * :frame_max h4. Default parameters Default connection parameters are
{
:host => "127.0.0.1",
:port => 5672,
:user => "guest",
:pass => "guest",
:vhost => "/",
:ssl => false,
:frame_max => 131072
}
h3. Using connection strings
It is convenient to be able to specify the AMQP connection
parameters as a URI string, and various "amqp" URI schemes
exist. Unfortunately, there is no standard for these URIs, so
while the schemes share the same basic idea, they differ in some
details. This implementation aims to encourage URIs that work
as widely as possible.
Here are some examples:
* amqp://dev.rabbitmq.com
* amqp://dev.rabbitmq.com:5672
* amqp://guest:guest@dev.rabbitmq.com:5672
* amqp://hedgehog:t0ps3kr3t@hub.megacorp.internal/production
* amqps://hub.megacorp.internal/%2Fvault
The URI scheme should be "amqp", or "amqps" if SSL is required.
The host, port, username and password are represented in the
authority component of the URI in the same way as in http URIs.
The vhost is obtained from the first segment of the path, with the
leading slash removed. The path should contain only a single segment
(i.e, the only slash in it should be the leading one). If the vhost
is to include slashes or other reserved URI characters, these should
be percent-escaped.
Here are some examples that demonstrate how
{AMQP::Client.parse_connection_uri} parses out the vhost from
connection URIs:
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com") # => vhost is nil, so default ("/") will be used
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/") # => vhost is an empty string
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/%2Fvault") # => vhost is "/vault"
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/production") # => vhost is "production"
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/a.b.c") # => vhost is "a.b.c"
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/foo/bar") # => ArgumentError
h2. Starting the event loop and connecting in standalone applications
h3. EventMachine event loop
The amqp gem uses "EventMachine":http://rubyeventmachine.com under the hood and needs an EventMachine
event loop to be running in order to connect to an AMQP broker or to send any data. This means that
before connecting to an AMQP broker, we need to _start the EventMachine reactor_ (get the event loop
going). Here is how to do it:
require "amqp"
EventMachine.run do
# ...
end
"EventMachine.run":http://eventmachine.rubyforge.org/EventMachine.html#M000461 will block the current thread until the event loop is stopped.
Standalone applications often can afford to start the event loop on the main thread. If you have no experience with threading, this is a
recommended way to proceed.
h3. Using AMQP.connect with a block
Once the event loop is running, the {AMQP.connect} method will attempt to connect to the broker. It can be used in two ways. Here is the
first one:
require "amqp"
EventMachine.run do
# using AMQP.connect with a block
AMQP.connect(:host => "localhost") do |client|
# connection is open and ready to be used
end
end
{AMQP.connect} takes a block that will be executed as soon as the AMQP connection is open. In order for a connection to be opened a TCP connection has to be set up,
authentication has to succeed, and the broker and client need to complete negotiation of connection parameters like max frame size.
h3. Using AMQP.connect without a callback
An alternative way of connecting is this:
require "amqp"
EventMachine.run do
# using AMQP.connect with a block
client = AMQP.connect(:host => "hub.megacorp.internal", :username => "hedgehog", :password => "t0ps3kr3t")
# connection is not yet open, however, amqp gem will delay channel
# operations until after the connection is open. Bear in mind that
# amqp gem cannot solve every possible race condition so be careful
end
If you do not need to assign the returned value to a variable, then the "block version" is recommended because it eliminates issues that may
arise from attempts to use a connection object that is not fully opened yet. For example, handling of authentication failures is simpler
with the block version, as we will see in the following sections.
h3. Using AMQP.start
EventMachine.run and {AMQP.connect} with a block is such a common combination that the amqp gem provides a shortcut:
require "amqp"
AMQP.start("amqp://dev.rabbitmq.com:5672") do |client|
# connection is open and ready to be used
end
As these examples demonstrate, {AMQP.connect} and {AMQP.start} accept either a Hash of connection options or a connection URI string.
See the reference documentation for each method to learn all of the options that they accept and what the default values are.
h3. On Thread#sleep use
When not passing a block to {AMQP.connect}, it is tempting to "give the connection some time to become established" by using Thread#sleep. Unless you are
running the event loop in a separate thread, please do not do this. Thread#sleep blocks the current thread so that if the event loop is running in the current thread,
blocking the thread _will also block the event loop_. *When the event loop is blocked, no data is sent or received, so the connection does not proceed.*
h3. Detecting TCP connection failures
When applications connect to the broker, they need to handle connection failures. Networks are not 100% reliable and even with modern system configuration tools,
like "Chef":http://http://www.opscode.com/chef or "Puppet":http://http://www.puppetlabs.com, misconfigurations can happen. Also, the broker might be down for some reason. Ideally, error detection should happen as early as possible. There are two ways of detecting
TCP connection failure, the first one is to catch an exception:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> TCP connection failure handling with a rescue statement"
puts
connection_settings = {
:port => 9689,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password",
:timeout => 0.3
}
begin
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
rescue AMQP::TCPConnectionFailed => e
puts "Caught AMQP::TCPConnectionFailed => TCP connection failed, as expected."
end
{AMQP.connect} (and {AMQP.start}) will raise {AMQP::TCPConnectionFailed} if the connection fails. Code that catches the error can write to a log
about the issue or use retry to execute the begin block one more time. Because initial connection failures are due to misconfiguration or network outage, reconnection
to the same endpoint (hostname, port, vhost combination) will result in the same error over and over again. TBD: failover, connection to the cluster.
An alternative way of handling connection failure is with an _errback_ (a callback for a specific kind of error):
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> TCP connection failure handling with a callback"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 9689,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password",
:timeout => 0.3,
:on_tcp_connection_failure => handler
}
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
the ":on_tcp_connection_failure" option accepts any object that responds to #call.
If you connect to the broker from code in a class (as opposed to top-level scope in a script), Object#method can be used to pass an object method as a handler
instead of a Proc.
TBD: provide an example
h3. Detecting authentication failures
A connection may also fail due to authentication failure. Handling authentication failure is very similar to handling an initial TCP
connection failure:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> Authentication failure handling with a callback"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 5672,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password_that_is_incorrect #{Time.now.to_i}",
:timeout => 0.3,
:on_tcp_connection_failure => handler,
:on_possible_authentication_failure => Proc.new { |settings|
puts "Authentication failed, as expected, settings are: #{settings.inspect}"
EM.stop
}
}
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
default handler raises {AMQP::PossibleAuthenticationFailureError}:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> Authentication failure handling with a rescue block"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 5672,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password_that_is_incorrect #{Time.now.to_i}",
:timeout => 0.3,
:on_tcp_connection_failure => handler
}
begin
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
rescue AMQP::PossibleAuthenticationFailureError => afe
puts "Authentication failed, as expected, caught #{afe.inspect}"
EventMachine.stop if EventMachine.reactor_running?
end
In case you are wondering why the callback name has "possible" in it, {http://bit.ly/mTr1YN AMQP 0.9.1 spec} requires broker implementations to
simply close the TCP connection without sending any more data when an exception, such as authentication failure, occurs before the AMQP connection
is open. In practice, however, when a broker closes a TCP connection after a successful TCP connection has been established but before an AMQP connection is open,
it means that authentication has failed.
h2. Starting the event loop and connecting in Web applications (Ruby on Rails, Sinatra, Merb, Rack)
Web applications are different from standalone applications in that the main thread is occupied by a Web/application server like Unicorn
or Thin, so you need to start the EventMachine reactor before you attempt to use {AMQP.connect}.
In a Ruby on Rails application, probably the best place for this is in the initializer (like config/initializers/amqp.rb). For Merb applications it is config/init.rb.
For Sinatra and pure Rack applications, place it next to the other configuration code.
Next, we are going to discuss issues specific to particular Web servers.
h3. Using Ruby amqp gem with Unicorn
h4. Unicorn is a pre-forking server
"Unicorn":http://unicorn.bogomips.org is a pre-forking server. That means it forks worker processes that serve HTTP requests. The "fork(2)":http://en.wikipedia.org/wiki/Fork_(operating_system) system call
has several gotchas associated with it, two of which affect EventMachine and the "Ruby amqp gem":http://github.com/ruby-amqp/amqp:
* Unintentional file descriptor sharing
* The fact that a "forked child process only inherits one thread":http://bit.ly/fork-and-threads and therefore the EventMachine thread is not inherited
To avoid both problems, start the EventMachine reactor and AMQP connection *after* the master process forks workers. The master Unicorn process never serves HTTP requests and usually
does not need to hold an AMQP connection. Next, let us see how to spin up the EventMachine reactor and connect to the broker after Unicorn forks a worker.
h4. Starting the EventMachine reactor and connecting to the broker after Unicorn forks worker processes
Unicorn lets you specify a configuration file to use. In that file you define a callback that Unicorn runs after it forks worker process(es):
ENV["FORKING"] = "true"
listen 3000
worker_processes 1
timeout 30
preload_app true
after_fork do |server, worker|
require "amqp"
# the following is *required* for Rails + "preload_app true",
defined?(ActiveRecord::Base) and
ActiveRecord::Base.establish_connection
t = Thread.new { AMQP.start }
sleep(1.0)
EventMachine.next_tick do
AMQP.channel ||= AMQP::Channel.new(AMQP.connection)
AMQP.channel.queue("amqpgem.examples.rails23.warmup", :durable => true)
3.times do |i|
puts "[after_fork/amqp] Publishing a warmup message ##{i}"
AMQP.channel.default_exchange.publish("A warmup message #{i} from #{Time.now.strftime('%H:%M:%S %m/%b/%Y')}", :routing_key => "amqpgem.examples.rails23.warmup")
end
end
end
In the example above we start the EventMachine reactor in a separate thread, block the current thread for 1 second to let the event loop spin up and then
connect to the AMQP broker on the next event loop tick. Publishing several warmup messages on boot is a good idea because it
allows the early detection of issues that forking may cause.
Note that a configuration file can easily be used in development environments because, other than the fact that Unicorn runs in the foreground,
it gives you exactly the same application boot behavior as in QA and production environments.
An "example Ruby on Rails application that uses the Ruby amqp gem and Unicorn":http://bit.ly/ruby-amqp-gem-example-with-ruby-on-rails-and-unicorn is available on GitHub.
h3. Using the Ruby amqp gem with Passenger
"Phusion Passenger":http://www.modrails.com is also a pre-forking server, and just as with Unicorn, the EventMachine reactor and AMQP connection should be started *after* it forks worker
processes. The Passenger documentation has "a section":http://bit.ly/passenger-forking-gotchas that explains how to avoid problems related to the behavior of the fork(2) system
call, namely:
* Unintentional file descriptor sharing
* The fact that a "forked child process only inherits one thread":http://bit.ly/fork-and-threads and therefore the EventMachine thread is not inherited
h4. Using an event handler to spawn one amqp connection per worker
Passenger provides a hook that you should use for spawning AMQP connections:
if defined?(PhusionPassenger) # otherwise it breaks rake commands if you put this in an initializer
PhusionPassenger.on_event(:starting_worker_process) do |forked|
if forked
# We're in a smart spawning mode
# Now is a good place to connect to the broker
end
end
end
Basically, the recommended default smart spawn mode works exactly the same as in Unicorn (with all of the same common pitfalls). An "example application":http://bit.ly/ruby-amqp-gem-example-with-ruby-on-rails-and-passenger is available on github.
h3. Using the Ruby amqp gem with Thin and Goliath
h4. Thin and Goliath start the EventMachine reactor for you, but there is a little nuance
If you use "Thin":http://code.macournoyer.com/thin/ or "Goliath":https://github.com/postrank-labs/goliath/, you are all set because those two servers use EventMachine under the hood.
There is no need to start the EventMachine reactor. However, depending on the application server, its version, the version of the framework and Rack middleware being used,
EventMachine reactor start may be slightly delayed. To overcome this potential difficulty, use EventMachine.next_tick to delay connection until after the reactor is actually running:
EventMachine.next_tick { AMQP.connect(...) }
So, in case the EventMachine reactor is not yet running on server/application boot, the connection will not fail but will instead wait for the reactor to start.
Thin and Goliath are not pre-forking servers so there is no need to re-establish the connection as you do with Unicorn and Passenger.
h2. If it just does not work: troubleshooting
If you have read this guide and your issue is still unresolved, check our {file:docs/Troubleshooting.textile Troubleshooting guide} before asking on the mailing list.
h2. What to read next
* {file:docs/Queues.textile Working With Queues}. This guide focuses on features consumer applications use heavily.
* {file:docs/Exchanges.textile Working With Exchanges}. This guide focuses on features producer applications use heavily.
* {file:docs/ErrorHandling.textile Error handling}
* {file:docs/ConnectionEncryptionWithTLS.textile Using TLS (SSL)} (if you want to use an SSL encrypted connection to the broker)
h2. Tell us what you think!
Please take a moment to tell us what you think about this guide "on Twitter":http://twitter.com/rubyamqp or the "Ruby AMQP mailing list":http://groups.google.com/group/ruby-amqp.
Let us know what was unclear or what has not been covered. Maybe you do not like the guide style or grammar or discover spelling mistakes. Reader feedback is
key to making the documentation better.
If, for some reason, you cannot use the communication channels mentioned above, you can "contact the author of the guides directly":mailto:michael@novemberain.com?subject=amqp%20gem%20documentation