module Bunny =begin rdoc === DESCRIPTION: *Exchanges* are the routing and distribution hub of AMQP. All messages that Bunny sends to an AMQP broker/server _have_ to pass through an exchange in order to be routed to a destination queue. The AMQP specification defines the types of exchange that you can create. At the time of writing there are four (4) types of exchange defined - * :direct * :fanout * :topic * :headers AMQP-compliant brokers/servers are required to provide default exchanges for the _direct_ and _fanout_ exchange types. All default exchanges are prefixed with 'amq.', for example - * amq.direct * amq.fanout * amq.topic * amq.match or amq.headers If you want more information about exchanges, please consult the documentation for your target broker/server or visit the {AMQP website}[http://www.amqp.org] to find the version of the specification that applies to your target broker/server. =end class Exchange attr_reader :client, :type, :name, :opts, :key def initialize(client, name, opts = {}) # check connection to server raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected @client, @name, @opts = client, name, opts # set up the exchange type catering for default names if name.match(/^amq\./) new_type = name.sub(/amq\./, '') # handle 'amq.match' default new_type = 'headers' if new_type == 'match' @type = new_type.to_sym else @type = opts[:type] || :direct end @key = opts[:key] @client.exchanges[@name] ||= self # ignore the :nowait option if passed, otherwise program will hang waiting for a # response that will not be sent by the server opts.delete(:nowait) unless name == "amq.#{type}" or name == '' client.send_frame( Qrack::Protocol::Exchange::Declare.new( { :exchange => name, :type => type, :nowait => false }.merge(opts) ) ) raise Bunny::ProtocolError, "Error declaring exchange #{name}: type = #{type}" unless client.next_method.is_a?(Qrack::Protocol::Exchange::DeclareOk) end end =begin rdoc === DESCRIPTION: Publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed. ==== OPTIONS: * :key => 'routing_key' - Specifies the routing key for the message. The routing key is used for routing messages depending on the exchange configuration. * :mandatory => true or false (_default_) - Tells the server how to react if the message cannot be routed to a queue. If set to _true_, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message. * :immediate => true or false (_default_) - Tells the server how to react if the message cannot be routed to a queue consumer immediately. If set to _true_, the server will return an undeliverable message with a Return method. If set to _false_, the server will queue the message, but with no guarantee that it will ever be consumed. ==== RETURNS: nil =end def publish(data, opts = {}) out = [] out << Qrack::Protocol::Basic::Publish.new( { :exchange => name, :routing_key => opts.delete(:key) || key }.merge(opts) ) data = data.to_s out << Qrack::Protocol::Header.new( Qrack::Protocol::Basic, data.length, { :content_type => 'application/octet-stream', :delivery_mode => (opts.delete(:persistent) ? 2 : 1), :priority => 0 }.merge(opts) ) out << Qrack::Transport::Body.new(data) client.send_frame(*out) end =begin rdoc === DESCRIPTION: Requests that an exchange is deleted from broker/server. Removes reference from exchanges if successful. If an error occurs raises _Bunny_::_ProtocolError_. ==== Options: * :if_unused => true or false (_default_) - If set to _true_, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead. * :nowait => true or false (_default_) - Ignored by Bunny, always _false_. ==== Returns: :delete_ok if successful =end def delete(opts = {}) # ignore the :nowait option if passed, otherwise program will hang waiting for a # response that will not be sent by the server opts.delete(:nowait) client.send_frame( Qrack::Protocol::Exchange::Delete.new({ :exchange => name, :nowait => false }.merge(opts)) ) raise Bunny::ProtocolError, "Error deleting exchange #{name}" unless client.next_method.is_a?(Qrack::Protocol::Exchange::DeleteOk) client.exchanges.delete(name) # return confirmation :delete_ok end end end