"master" branch: {<img src="https://travis-ci.org/rightscale/right_amqp.svg?branch=master" alt="Build Status" />}[https://travis-ci.org/rightscale/right_amqp]

= RightAMQP

= DESCRIPTION

== Synopsis

RightAMQP provides a high availability client for interfacing with the
RightScale RabbitMQ broker using the AMQP protocol. The AMQP version on which
this gem is based is 0.6.7 but beyond that it contains a number of bug fixes and
enhancements including reconnect, message return, heartbeat, and UTF-8 support.
The high availability is achieved by maintaining multiple broker connections
such that failed connections automatically reconnect and only connected
brokers are used when routing a message. Although the HABrokerClient class
is the intended primary means for accessing RabbitMQ services with this gem,
alternatively the underlying AMQP services may be used directly.

Refer to the wiki (https://github.com/rightscale/right_amqp/wiki) for up-to-date
documentation.

Also use the built-in issues tracker (https://github.com/rightscale/right_amqp/issues)
to report issues.

Maintained by the RightScale Cornsilk Team

== Interface

The focus here is on the interface provided by the RightAMQP::HABrokerClient class
for connecting to one or more brokers in high availability fashion, subscribing
to queues or exchanges on each of the brokers, publishing messages using any of
the brokers, and monitoring the status of the broker connections.

The namespace for high availability access is RightAMQP. The namespace for low
level AMQP support remains AMQP. Both rely on eventmachine for task management.

=== Connecting

Creating an HABrokerClient object causes connections to one or more brokers
to be created, e.g.,

  b = RightAMQP::HABrokerClient.new(serializer, :user => 'user', :pass => 'secret',
                                    :vhost => '/abc', :host => 'broker0.com,broker1.com')

would result in a connection to the brokers with domain names broker0.com and broker1.com
using the specified :user and :pass as RabbitMQ credentials and :vhost as the namespace
in which to operate. See the detailed code documentation for other configuration options.

To know when a connection has been established, #connection_status is used, e.g.,

  b.connection_status(:one_off => 60) do |status|
    if status == :connected
      # Perform other application setup including subscribing to queues
    elsif status == :failed
      puts "Could not connect to any brokers"
      EM.stop
    end
  end

with the :on_off option indicating that only seeking to be notified of the
first connection status change and in this case only willing to wait 60 seconds.

=== Reconnecting, Heartbeat, and Status Updates

If a broker connection is lost, the HABrokerClient will automatically attempt
to reconnect on the interval specified with the :reconnect_interval option.
As further protection against lost connections the :heartbeat option may be
specified as the interval between AMQP keep alive messages being sent between
the application and the brokers. This is useful in firewall environments.

The #connection_status method used above to detect the initial connection may
also be used to monitor connectivity throughout the life of the application, e.g.,

  b.connection_status do |status|
    puts "Status changed to #{status}"
  end

For this request the status is only reported for all the brokers as a whole,
i.e., a :disconnected status means that have lost connectivity to all
brokers, and then a subsequent :connected status means that have regained
connection to at least one broker. There are finer grain controls by
indicating the specific :brokers of interest

=== Subscribing, Declaring, and Unsubscribing

To receive messages via this interface, one must subscribe to one or more
AMQP queues or exchanges. This is done with the #subscribe method, e.g.,

  queue = {:name => 'my_queue', :options => {:durable => true}}
  broker_ids = @broker.subscribe(queue, exchange = nil, :ack => true) do |id, msg|
    puts "Received packet #{msg.inspect} via broker #{id}"
  end

causes the queue named "my_queue" to be created on all currently connected
brokers in the AMQP durable fashion, meaning that it is preserved across
restarts of the broker. The #subscribe call returns the ids of the brokers
to which the subscribe request was submitted. The :ack option indicates to
explicitly acknowledge each message as soon as it is received, rather than use
implicit ack handling. The provided block is executed each time a message is
received on the queue.

If serialization is configured, the message delivered here is after applying
the serializers #load method; otherwise the message is exactly the bytes
that were placed in the queue.

To unsubscribe from a queue the #unsubscribe method is used.

To simply create a queue or exchange without subscribing to it to receive
messages, use #declare.

=== Publishing

To publish a message to a queue or exchange, the #publish method is used, e.g.,

  queue = {:type => :queue, :name => "request", :options => {:no_declare => true}}
  broker_ids = b.publish(queue, message, :persistent => true, :mandatory => true)

causes the specified message to be published to the queue named "request".
The :no_declare option is specified to keep AMQP from attempting to create
the exchange before publishing to it. On the publish the :persistent option
indicates that all attempts are to be made to preserve the message if the broker
is stopped or crasheds, but this is not a guarantee. The :mandatory option
indicates that the broker is to return the message if the specified queue
does not exist or is not being consumed, i.e., subscribed to by another
application.

=== Serialization

If a serializer is supplied when the HABrokerClient is constructed, its #load
method is applied to all messages received from a broker, and its #dump method
is applied to all messages that are published. Even if a serializer is specified
it is possible to specify :no_unserialize for a particular subscription or
:no_serialize for a message being published.

If no serializer is supplied, individual messages published or received are
not logged. Further a serialized message is probed for the existence of certain
properties, like :token, :type, and :from, and these are tracked so that if
a message is returned as undeliverable, this information can be provided on
the #non_delivery callback.

=== Message Return and Non-Delivery Callback

If a broker returns a message because it cannot be delivered to the intended
recipient, e.g., because the option :mandatory was set and there are no consumers
or the broker is in the process of shutting down, the HABrokerClient attempts
to deliver the message using another broker in the configured set.

When it runs out of connected brokers to attempt the delivery, it declares
the message and undeliverable. In this case it also executes the block, if any,
supplied by the application via the #non_delivery method. The data supplied
includes the reason, type, token, from, and to, with the type, token, and from
values being nil unless they could be extracted from the message being published.

The possible reasons for non-delivery are:
  "NO_ROUTE" - queue does not exist
  "NO_CONSUMERS" - queue exists but it has no consumers
  "ACCESS_REFUSED" - queue not usable because broker is in the process of stopping service

=== Closing

When all connections to the brokers are to be closed, the #close method is used

=== Identities

A broker is given an identity of the form "rs-broker-<hostname>-<port>" where
<hostname> is its host name, e.g., broker1.com, and <port> is the TCP port number
used (5672 by default). This is used in the interface when a specific broker
needs to be referred to, e.g., when communicating status.

=== Error Callbacks

When constructing the HABrokerClient the :exception_callback option can be
specified to define the Proc to be activated on exception events. In addition
an :exception_on_receive_callback Proc can be specified for activation when
a message cannot be received.

=== Logging

To enable logging in the HABrokerClient set RightSupport::Log::Mixin.default_logger
to the logger in use that supports the standard ruby Logger interface. Logging
can be disabled on individual #subscribe and #publish requests with :no_log.
By default each message received or published is logged unless no serializer
is supplied.

Detailed AMQP logging can be enabled by setting AMQP.logging = true.

=== Status and Stats

The current status of all brokers can be obtained with the #status method.
The operation statistics are obtained with the #stats method. This method
has an option for resetting the statistics.

= ADDITIONAL RESOURCES

* [1] RabbitMQ is http://www.rabbitmq.com/documentation.html

= LICENSE

<b>RightAMQP</b>

Copyright:: Copyright (c) 2012 RightScale, Inc.

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
'Software'), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.