h1. Connecting to the broker h2. About this guide This guide covers connection to AMQP broker from standalone and Web applications, connection error handling, authentication failure handling and related issues.. h2. Covered versions This guide covers amqp gem v0.8.0 and later. h2. Terminology In this guide we define standalone application as application that does not run on a Web server like Unicorn or Passenger. The key difference is that these applications control main Ruby VM thread and often use it to run EventMachine event loop. When amqp gem is used inside of a Web applications, main thread is occupied by Web application server and code required to establish connection to AMQP broker needs to be a little bit different. h2. Two ways to specify connection parameters Connection parameters (host, port, username, vhost and so on) can be passed in two forms: * As a hash * As a connection URI string (à la JDBC) h3. Using a hash Hash options amqp gem will recognize are * :host * :port * :username (aliased as :user) * :password (aliased as :pass) * :vhost * :ssl * :timeout * :frame_max h4. Default parameters Default connection parameters are
{
:host => "127.0.0.1",
:port => 5672,
:user => "guest",
:pass => "guest",
:vhost => "/",
:ssl => false,
:frame_max => 131072
}
h3. Using connection strings
It is possible to use connection URIs (à la JDBC) with amqp and amqps schemas, for example:
* amqp://dev.rabbitmq.com
* amqp://dev.rabbitmq.com:5672
* amqp://guest:guest@dev.rabbitmq.com:5672
* amqp://hedgehog:t0ps3kr3t@hub.megacorp.internal/production
* amqps://hub.megacorp.internal//
Two supported schemas are *amqp* and *amqps*. Default port for amqp is 5672. If port isn't given, and schema
is amqps, amqp gem will assume it has to connect to 5671, default port for amqps.
h4. Vhosts: naming schemas and parsing
AMQP 0.9.1 spec does not define what vhost naming scheme should be. RabbitMQ and Apache Qpid use different schemes
(Qpid said to have two) but the bottom line is: even though some brokers use / as the default vhost, it can be *any string*.
Host (and optional port) part must be separated from vhost (path component) with a slash character (/).
Here are some examples that demonstrate how {AMQP::Client.parse_connection_uri} parses out vhost from connection URI:
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com") # => vhost is nil, so default (/) will be used
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/") # => vhost is an empty string
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com//") # => vhost is /
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com//vault") # => vhost is /vault
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/%2Fvault") # => vhost is /vault
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/production") # => vhost is production
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/a.b.c") # => vhost is a.b.c
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com///a/b/c/d") # => vhost is //a/b/c/d
{AMQP::Client.parse_connection_uri} will also unescape path part of the URI.
h2. Starting event loop & connecting in standalone applications
h3. EventMachine event loop
amqp gem uses "EventMachine":http://rubyeventmachine.com under the hood and needs EventMachine
event loop to be running in order to connect to AMQP broker or send any data. This means that
before connecting to AMQP broker, we need to _start EventMachine reactor_ (get the event loop
going). Here is how to do it:
require "amqp"
EventMachine.run do
# ...
end
"EventMachine.run":http://eventmachine.rubyforge.org/EventMachine.html#M000461 will block current thread until event loop is stopped.
Standalone applications often can afford starting event loop on the main thread. If you have no experience with threading, this is a
recommended way.
h3. Using AMQP.connect with a block
Once event loop is running, {AMQP.connect} method will attempt to connect to the broker. It can be used in two ways. Here is the
first one:
require "amqp"
EventMachine.run do
# using AMQP.connect with a block
AMQP.connect(:host => "localhost") do |client|
# connection is open and ready to be used
end
end
{AMQP.connect} takes a block that will be executed as soon as AMQP connection is open (TCP connection was set up,
authentication succeeded, broker and client finished negotiating connection parameters like max frame size).
h3. Using AMQP.connect without a callback
Alternative way of connecting is this:
require "amqp"
EventMachine.run do
# using AMQP.connect with a block
client = AMQP.connect(:host => "hub.megacorp.internal", :username => "hedgehog", :password => "t0ps3kr3t")
# connection is not yet open, however, amqp gem will delay
# channel operations until after connection is open. However,
# amqp gem cannot solve every possible race condition so be careful
end
If you do not need to assign returned value to a variable, "block version" is recommended because it eliminates issues that may
arise from attempts to use a connection object that is not fully opened yet. For example, handling of authentication failures is simpler
with the block version, as we will see in the following sections.
h3. Using AMQP.start
EventMachine.run and {AMQP.connect} with a block is such a common combination that amqp gem provides a shortcut:
require "amqp"
AMQP.start("amqp://dev.rabbitmq.com:5672") do |client|
# connection is open and ready to be used
end
As these examples demonstrate, {AMQP.connect} and {AMQP.start} accept either a Hash of connection options or a connection URI string.
See reference documentation for each method to learn all the options they accept and what the default values are.
h3. On Thread#sleep use
When not passing a block to {AMQP.connect}, it is tempting to "give connection some time to get through" by using Thread#sleep. Unless you are
running event loop in a separate thread, don't do this. Thread#sleep blocks current thread so if event loop is running the very same current thread,
blocking it _will also block the event loop_. *When event loop is blocked, no data is sent or received, so connection does not proceed.*
h3. Detecting TCP connection failures
When applications connect to the broker, they need to handle connection failures. Networks are not 100% reliable, even with modern system configuration tools
like Chef or Puppet misconfigurations happen and broker might be down, too. Error detection should happen as early as possible. There are two ways of detecting
TCP connection failure, the first one is to catch an exception:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> TCP connection failure handling with a rescue statement"
puts
connection_settings = {
:port => 9689,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password",
:timeout => 0.3
}
begin
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
rescue AMQP::TCPConnectionFailed => e
puts "Caught AMQP::TCPConnectionFailed => TCP connection failed, as expected."
end
{AMQP.connect} (and {AMQP.start}) will raise {AMQP::TCPConnectionFailed} if connection fails. Code that catches it can write to log
about the issue or use retry to execute begin block one more time. Because initial connection failures are due to misconfiguration or network outage, reconnection
to the same endpoint (hostname, port, vhost combination) will result in the same issue over and over. TBD: failover, connection to the cluster.
Alternative way of handling connection failure is with an errback (a callback for specific kind of error):
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> TCP connection failure handling with a callback"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 9689,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password",
:timeout => 0.3,
:on_tcp_connection_failure => handler
}
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
:on_tcp_connection_failure option accepts any object that responds to #call.
If you connect to the broker from a code in a class (as opposed to top-level scope in a script), Object#method can be used to pass object method as a handler
instead of a Proc.
TBD: provide an example
h3. Detecting authentication failures
Another reason why connection may fail is authentication failure. Handling authentication failure is very similar to handling initial TCP
connection failure:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> Authentication failure handling with a callback"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 5672,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password_that_is_incorrect #{Time.now.to_i}",
:timeout => 0.3,
:on_tcp_connection_failure => handler,
:on_possible_authentication_failure => Proc.new { |settings|
puts "Authentication failed, as expected, settings are: #{settings.inspect}"
EM.stop
}
}
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
default handler raises {AMQP::PossibleAuthenticationFailureError}:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> Authentication failure handling with a rescue block"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 5672,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password_that_is_incorrect #{Time.now.to_i}",
:timeout => 0.3,
:on_tcp_connection_failure => handler
}
begin
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
rescue AMQP::PossibleAuthenticationFailureError => afe
puts "Authentication failed, as expected, caught #{afe.inspect}"
EventMachine.stop if EventMachine.reactor_running?
end
In case you wonder why callback name has "possible" in it: {http://bit.ly/mTr1YN AMQP 0.9.1 spec} requires broker implementations to
simply close TCP connection without sending any more data when an exception (such as authentication failure) occurs before AMQP connection
is open. In practice, however, when broker closes TCP connection between successful TCP connection and before AMQP connection is open,
it means that authentication has failed.
h2. Starting event loop & connecting in Web applications (Ruby on Rails, Sinatra, Merb, Rack)
Web applications are different from standalone applications in that main thread is occupied by Web/application server like Unicorn
or Thin, so you need to start EventMachine reactor before you attempt to use {AMQP.connect}.
In a Ruby on Rails app, probably the best place for this is in initializer (like config/initializers/amqp.rb). For Merb apps, it is config/init.rb.
For Sinatra and pure Rack applications, place it next to other configuration code.
Next we are going to discuss issues specific to particular Web servers.
h3. Using amqp gem with Unicorn
h4. Use separate thread for EventMachine reactor
Since Unicorn is not EventMachine-based, you need to start EventMachine reactor in a separate thread like this:
Thread.new { EventMachine.run }
# give EventMachine reactor a moment to start
sleep(0.5)
# now is a good moment to use AMQP.connect
Otherwise EventMachine will block current thread.
h4. Starting EventMachine reactor after Unicorn forks worker processes
Because *Unicorn is a pre-forking server, you need to run the same piece of code in
after_fork hook in Unicorn configuration file for your app*, otherwise, worker processes won't have EventMachine reactor running:
# example snippet of Unicorn configuration file ( config/unicorn/development.rb or similar)
after_fork do |server, worker|
Thread.new { EventMachine.run }
# give EventMachine reactor a moment to start
sleep(0.5)
# now is a good moment to use AMQP.connect
end
h3. Using amqp gem with Passenger
TBD: if you are a Passenger user, please help us write this section!
h3. Using amqp gem with Thin and Goliath
h4. Thin and Goliath start EventMachine reactor for you, but there is a little nuance
If you use "Thin":http://code.macournoyer.com/thin/ or "Goliath":https://github.com/postrank-labs/goliath/, you are all set: those two servers use EventMachine under the hood.
There is no need to start EventMachine reactor. However, depending on app server, it's version, version of the framework and Rack middleware being used,
EventMachine reactor start may be slightly delayed. To not depend on this factor, use EventMachine.next_tick to delay connection until after reactor is actually running:
EventMachine.next_tick { AMQP.connect(...) }
So in case EventMachine reactor isn't running yet on server/application boot, connection won't fail but instead wait for reactor to start.
Thin and Goliath are not pre-forking servers so there is no need to re-establish connection the way you do it with Unicorn and Passenger.
h2. If it just doesn't work: troubleshooting
If you read this guide yet your issue is still unresolved, check the following things before asking on the mailing list:
* AMQP broker log.
* List of users in a particular vhost you are trying to connect
* Network connectivity. We know, it's obvious, yet even experienced developers and devops engineers struggle with network access misconfigurations every once in a while.
* If EventMachine is started in a separate thread, make sure that isn't dead. If it is, this usually means there was an exception that caused it to terminate.
h3. Inspecting AMQP broker log file
In this section we will cover typical problems that can be tracked down by reading AMQP broker log. We will use RabbitMQ as an example, however, different AMQP brokers
often log most of the same issues.
RabbitMQ logs abrupt TCP connection failures, timeouts, protocol version mismatches and so on.
If you are running RabbitMQ, log locations for various operating systems and distributions is documented in the "RabbitMQ installation guide":http://www.rabbitmq.com/install.html
On Mac OS X, RabbitMQ installed via Homebrew logs to $HOMEBREW_HOME/var/log/rabbitmq/rabbit@$HOSTNAME.log. For example, if you have Homebrew installed at /usr/local and
your hostname is giove, log will be at /usr/local/var/log/rabbitmq/rabbit@giove.log.
Here is what authentication failure looks like in RabbitMQ log:
=ERROR REPORT==== 17-May-2011::17:37:58 === exception on TCP connection <0.4770.0> from 127.0.0.1:46551 {channel0_error,starting, {amqp_error,access_refused, "AMQPLAIN login refused: user 'pipeline_agent' - invalid credentials", 'connection.start_ok'}}This means that connection attempt with username pipeline_agent failed because credentials were invalid. If you are seeing this message, make sure username, password *and vhost* are correct. The following entry:
=ERROR REPORT==== 17-May-2011::17:26:28 === exception on TCP connection <0.4201.62> from 10.8.0.30:57990 {bad_header,<<65,77,81,80,0,0,9,1>>}Means that client supports AMQP 0.9.1 but broker doesn't (RabbitMQ versions pre-2.0 only support AMQP 0.8, for example). If you are using amqp gem 0.8 or later and seeing this entry in your broker log, you are connecting to AMQP broker that is too old to support this AMQP version. In case of RabbitMQ, make sure you run version 2.0 or later. h2. What to read next * {file:docs/Queues.textile Queues} * {file:docs/ErrorHandling.textile Error handling} * {file:docs/ConnectionEncryptionWithTLS.textile Using TLS (SSL)} (if you want to use SSL encrypted connection to the broker) h2. Tell us what you think! Please take a moment and tell us what you think about this guide on "Ruby AMQP mailing list":http://groups.google.com/group/ruby-amqp: what was unclear? what wasn't covered? maybe you don't like guide style or grammar and spelling are incorrect? Readers feedback is key to making documentation better. If mailing list communication is not an option for you for some reason, you can "contact guides author directly":mailto:michael@novemberain.com?subject=amqp%20gem%20documentation