module Bunny
=begin rdoc
=== DESCRIPTION:
*Exchanges* are the routing and distribution hub of AMQP. All messages that Bunny sends
to an AMQP broker/server _have_ to pass through an exchange in order to be routed to a
destination queue. The AMQP specification defines the types of exchange that you can create.
At the time of writing there are four (4) types of exchange defined -
* :direct
* :fanout
* :topic
* :headers
AMQP-compliant brokers/servers are required to provide default exchanges for the _direct_ and
_fanout_ exchange types. All default exchanges are prefixed with 'amq.', for example -
* amq.direct
* amq.fanout
* amq.topic
* amq.match or amq.headers
If you want more information about exchanges, please consult the documentation for your
target broker/server or visit the {AMQP website}[http://www.amqp.org] to find the version of the
specification that applies to your target broker/server.
=end
class Exchange
attr_reader :client, :type, :name, :opts, :key
def initialize(client, name, opts = {})
# check connection to server
raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected
@client, @name, @opts = client, name, opts
# set up the exchange type catering for default names
if name.match(/^amq\./)
new_type = name.sub(/amq\./, '')
# handle 'amq.match' default
new_type = 'headers' if new_type == 'match'
@type = new_type.to_sym
else
@type = opts[:type] || :direct
end
@key = opts[:key]
@client.exchanges[@name] ||= self
# ignore the :nowait option if passed, otherwise program will hang waiting for a
# response that will not be sent by the server
opts.delete(:nowait)
unless name == "amq.#{type}" or name == ''
client.send_frame(
Qrack::Protocol::Exchange::Declare.new(
{ :exchange => name, :type => type, :nowait => false }.merge(opts)
)
)
raise Bunny::ProtocolError,
"Error declaring exchange #{name}: type = #{type}" unless
client.next_method.is_a?(Qrack::Protocol::Exchange::DeclareOk)
end
end
=begin rdoc
=== DESCRIPTION:
Publishes a message to a specific exchange. The message will be routed to queues as defined
by the exchange configuration and distributed to any active consumers when the transaction,
if any, is committed.
==== OPTIONS:
* :key => 'routing_key' - Specifies the routing key for the message. The routing key is
used for routing messages depending on the exchange configuration.
* :mandatory => true or false (_default_) - Tells the server how to react if the message
cannot be routed to a queue. If set to _true_, the server will return an unroutable message
with a Return method. If this flag is zero, the server silently drops the message.
* :immediate => true or false (_default_) - Tells the server how to react if the message
cannot be routed to a queue consumer immediately. If set to _true_, the server will return an
undeliverable message with a Return method. If set to _false_, the server will queue the message,
but with no guarantee that it will ever be consumed.
==== RETURNS:
nil
=end
def publish(data, opts = {})
out = []
out << Qrack::Protocol::Basic::Publish.new(
{ :exchange => name, :routing_key => opts.delete(:key) || key }.merge(opts)
)
data = data.to_s
out << Qrack::Protocol::Header.new(
Qrack::Protocol::Basic,
data.length, {
:content_type => 'application/octet-stream',
:delivery_mode => (opts.delete(:persistent) ? 2 : 1),
:priority => 0
}.merge(opts)
)
out << Qrack::Transport::Body.new(data)
client.send_frame(*out)
end
=begin rdoc
=== DESCRIPTION:
Requests that an exchange is deleted from broker/server. Removes reference from exchanges
if successful. If an error occurs raises _Bunny_::_ProtocolError_.
==== Options:
* :if_unused => true or false (_default_) - If set to _true_, the server will only
delete the exchange if it has no queue bindings. If the exchange has queue bindings the
server does not delete it but raises a channel exception instead.
* :nowait => true or false (_default_) - Ignored by Bunny, always _false_.
==== Returns:
:delete_ok if successful
=end
def delete(opts = {})
# ignore the :nowait option if passed, otherwise program will hang waiting for a
# response that will not be sent by the server
opts.delete(:nowait)
client.send_frame(
Qrack::Protocol::Exchange::Delete.new({ :exchange => name, :nowait => false }.merge(opts))
)
raise Bunny::ProtocolError,
"Error deleting exchange #{name}" unless
client.next_method.is_a?(Qrack::Protocol::Exchange::DeleteOk)
client.exchanges.delete(name)
# return confirmation
:delete_ok
end
end
end