h1. Connecting to the broker h2. About this guide This guide covers connection to AMQP broker from standalone and Web applications, connection error handling, authentication failure handling and related issues.. h2. Covered versions This guide covers amqp gem v0.8.0 and later. h2. Terminology In this guide we define standalone application as application that does not run on a Web server like Unicorn or Passenger. The key difference is that these applications control main Ruby VM thread and often use it to run EventMachine event loop. When amqp gem is used inside of a Web applications, main thread is occupied by Web application server and code required to establish connection to AMQP broker needs to be a little bit different. h2. In standalone applications h3. EventMachine event loop amqp gem uses "EventMachine":http://rubyeventmachine.com under the hood and needs EventMachine event loop to be running in order to connect to AMQP broker or send any data. This means that before connecting to AMQP broker, we need to _start EventMachine reactor_ (get the event loop going). Here is how to do it:
require "amqp"
EventMachine.run do
# ...
end
"EventMachine.run":http://eventmachine.rubyforge.org/EventMachine.html#M000461 will block current thread until event loop is stopped.
Standalone applications often can afford starting event loop on the main thread. If you have no experience with threading, this is a
recommended way.
h3. AMQP.connect with a block
Once event loop is running, {AMQP.connect} method will attempt to connect to the broker. It can be used in two ways. Here is the
first one:
require "amqp"
EventMachine.run do
# using AMQP.connect with a block
AMQP.connect(:host => "localhost") do |client|
# connection is open and ready to be used
end
end
{AMQP.connect} takes a block that will be executed as soon as AMQP connection is open (TCP connection was set up,
authentication succeeded, broker and client finished negotiating connection parameters like max frame size).
h3. AMQP.connect without a callback
Alternative way of connecting is this:
require "amqp"
EventMachine.run do
# using AMQP.connect with a block
client = AMQP.connect(:host => "hub.megacorp.internal", :username => "hedgehog", :password => "t0ps3kr3t")
# connection is not yet open, however, amqp gem will delay
# channel operations until after connection is open. However,
# amqp gem cannot solve every possible race condition so be careful
end
If you do not need to assign returned value to a variable, "block version" is recommended because it eliminates issues that may
arise from attempts to use a connection object that is not fully opened yet. For example, handling of authentication failures is simpler
with the block version, as we will see in the following sections.
h3. AMQP.start
EventMachine.run and {AMQP.connect} with a block is such a common combination that amqp gem provides a shortcut:
require "amqp"
AMQP.start("amqp://dev.rabbitmq.com:5672/") do |client|
# connection is open and ready to be used
end
As these examples demonstrate, {AMQP.connect} and {AMQP.start} accept either a Hash of connection options or a connection URI string.
See reference documentation for each method to learn all the options they accept and what the default values are.
h3. On Thread#sleep use
When not passing a block to {AMQP.connect}, it is tempting to "give connection some time to get through" by using Thread#sleep. Unless you are
running event loop in a separate thread, don't do this. Thread#sleep blocks current thread so if event loop is running the very same current thread,
blocking it _will also block the event loop_. *When event loop is blocked, no data is sent or received, so connection does not proceed.*
h3. Detecting TCP connection failures
When applications connect to the broker, they need to handle connection failures. Networks are not 100% reliable, even with modern system configuration tools
like Chef or Puppet misconfigurations happen and broker might be down, too. Error detection should happen as early as possible. There are two ways of detecting
TCP connection failure, the first one is to catch an exception:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> TCP connection failure handling with a rescue statement"
puts
connection_settings = {
:port => 9689,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password",
:timeout => 0.3
}
begin
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
rescue AMQP::TCPConnectionFailed => e
puts "Caught AMQP::TCPConnectionFailed => TCP connection failed, as expected."
end
{AMQP.connect} (and {AMQP.start}) will raise {AMQP::TCPConnectionFailed} if connection fails. Code that catches it can write to log
about the issue or use retry to execute begin block one more time. Because initial connection failures are due to misconfiguration or network outage, reconnection
to the same endpoint (hostname, port, vhost combination) will result in the same issue over and over. TBD: failover, connection to the cluster.
Alternative way of handling connection failure is with an errback (a callback for specific kind of error):
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> TCP connection failure handling with a callback"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 9689,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password",
:timeout => 0.3,
:on_tcp_connection_failure => handler
}
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
:on_tcp_connection_failure option accepts any object that responds to #call.
If you connect to the broker from a code in a class (as opposed to top-level scope in a script), Object#method can be used to pass object method as a handler
instead of a Proc.
TBD: provide an example
h3. Detecting authentication failures
Another reason why connection may fail is authentication failure. Handling authentication failure is very similar to handling initial TCP
connection failure:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> Authentication failure handling with a callback"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 5672,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password_that_is_incorrect #{Time.now.to_i}",
:timeout => 0.3,
:on_tcp_connection_failure => handler,
:on_possible_authentication_failure => Proc.new { |settings|
puts "Authentication failed, as expected, settings are: #{settings.inspect}"
EM.stop
}
}
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
default handler raises {AMQP::PossibleAuthenticationFailureError}:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> Authentication failure handling with a rescue block"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 5672,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password_that_is_incorrect #{Time.now.to_i}",
:timeout => 0.3,
:on_tcp_connection_failure => handler
}
begin
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
rescue AMQP::PossibleAuthenticationFailureError => afe
puts "Authentication failed, as expected, caught #{afe.inspect}"
EventMachine.stop if EventMachine.reactor_running?
end
In case you wonder why callback name has "possible" in it: {http://bit.ly/mTr1YN AMQP 0.9.1 spec} requires broker implementations to
simply close TCP connection without sending any more data when an exception (such as authentication failure) occurs before AMQP connection
is open. In practice, however, when broker closes TCP connection between successful TCP connection and before AMQP connection is open,
it means that authentication has failed.
h2. In Web applications (Ruby on Rails, Sinatra, Merb, Rack)
Web applications are different from standalone applications in that main thread is occupied by Web/application server like Unicorn
or Thin, so you need to start EventMachine reactor before you attempt to use {AMQP.connect}.
In a Ruby on Rails app, probably the best place for this is in initializer (like config/initializers/amqp.rb). For Merb apps, it is config/init.rb.
For Sinatra and pure Rack applications, place it next to other configuration code.
Next we are going to discuss issues specific to particular Web servers.
h3. Using amqp gem with Unicorn
h4. Use separate thread for EventMachine reactor
Since Unicorn is not EventMachine-based, you need to start EventMachine reactor in a separate thread like this:
Thread.new { EventMachine.run }
# give EventMachine reactor a moment to start
sleep(0.5)
# now is a good moment to use AMQP.connect
Otherwise EventMachine will block current thread.
h4. Starting EventMachine reactor after Unicorn forks worker processes
Because *Unicorn is a pre-forking server, you need to run the same piece of code in
after_fork hook in Unicorn configuration file for your app*, otherwise, worker processes won't have EventMachine reactor running:
# example snippet of Unicorn configuration file ( config/unicorn/development.rb or similar)
after_fork do |server, worker|
Thread.new { EventMachine.run }
# give EventMachine reactor a moment to start
sleep(0.5)
# now is a good moment to use AMQP.connect
end
h3. Using amqp gem with Passenger
TBD: if you are a Passenger user, please help us write this section!
h3. Using amqp gem with Thin and Goliath
h4. Thin and Goliath start EventMachine reactor for you, but there is a little nuance
If you use "Thin":http://code.macournoyer.com/thin/ or "Goliath":https://github.com/postrank-labs/goliath/, you are all set: those two servers use EventMachine under the hood.
There is no need to start EventMachine reactor. However, depending on app server, it's version, version of the framework and Rack middleware being used,
EventMachine reactor start may be slightly delayed. To not depend on this factor, use EventMachine.next_tick to delay connection until after reactor is actually running:
EventMachine.next_tick { AMQP.connect(...) }
So in case EventMachine reactor isn't running yet on server/application boot, connection won't fail but instead wait for reactor to start.
Thin and Goliath are not pre-forking servers so there is no need to re-establish connection the way you do it with Unicorn and Passenger.
h2. What to read next
* {file:docs/Queues.textile Queues}
* {file:docs/ErrorHandling.textile Error handling}
* {file:docs/ConnectionEncryptionWithTLS.textile Using TLS (SSL)} (if you want to use SSL encrypted connection to the broker)
h2. Tell us what you think!
Please take a moment and tell us what you think about this guide on "Ruby AMQP mailing list":http://groups.google.com/group/ruby-amqp:
what was unclear? what wasn't covered? maybe you don't like guide style or grammar and spelling are incorrect? Readers feedback is
key to making documentation better.
If mailing list communication is not an option for you for some reason, you can "contact guides author directly":mailto:michael@novemberain.com?subject=amqp%20gem%20documentation