lib/amqp.rb in amqp-1.1.0.pre1 vs lib/amqp.rb in amqp-1.1.0.pre2

- old
+ new

@@ -1,16 +1,250 @@ # encoding: utf-8 require "amq/protocol" -require "amq/client" -require "amq/client/adapters/event_machine" require "amqp/version" if RUBY_VERSION =~ /^1.8.7/ require "amqp/compatibility/ruby187_patchlevel_check" end + +require "amqp/handlers_registry" +require "amqp/callbacks" +require "amqp/entity" + require "amqp/exceptions" -require "amqp/connection" +require "amqp/settings" +require "amqp/deferrable" +require "amqp/session" require "amqp/exchange" require "amqp/queue" require "amqp/channel" require "amqp/header" + +# Top-level namespace of amqp gem. Please refer to "See also" section below. +# +# @see AMQP.connect +# @see AMQP.start +# @see AMQP::Channel +# @see AMQP::Exchange +# @see AMQP::Queue +module AMQP + + # Starts EventMachine event loop unless it is already running and connects + # to AMQP broker using {AMQP.connect}. It is generally a good idea to + # start EventMachine event loop in a separate thread and use {AMQP.connect} + # (for Web applications that do not use Thin or Goliath, it is the only option). + # + # See {AMQP.connect} for information about arguments this method takes and + # information about relevant topics such as authentication failure handling. + # + # @example Using AMQP.start to connect to AMQP broker, EventMachine loop isn't yet running + # AMQP.start do |connection| + # # default is to connect to localhost:5672, to root ("/") vhost as guest/guest + # + # # this block never exits unless either AMQP.stop or EM.stop + # # is called. + # + # AMQP::Channel(connection) do |channel| + # channel.queue("", :auto_delete => true).bind(channel.fanout("amq.fanout")).subscribe do |headers, payload| + # # handle deliveries here + # end + # end + # end + # + # @api public + def self.start(connection_options_or_string = {}, other_options = {}, &block) + EM.run do + if !@connection || @connection.closed? || @connection.closing? + @connection = connect(connection_options_or_string, other_options, &block) + end + @channel = Channel.new(@connection) + @connection + end + end + + # Alias for {AMQP.start} + # @api public + def self.run(*args, &block) + self.start(*args, &block) + end + + # Properly closes default AMQP connection and then underlying TCP connection. + # Pass it a block if you want a piece of code to be run once default connection + # is successfully closed. + # + # @note If default connection was never established or is in the closing state already, + # this method has no effect. + # @api public + def self.stop(reply_code = 200, reply_text = "Goodbye", &block) + return if @connection.nil? || self.closing? + + EM.next_tick do + if AMQP.channel and AMQP.channel.open? and AMQP.channel.connection.open? + AMQP.channel.close + end + AMQP.channel = nil + + + shim = Proc.new { + block.call + + AMQP.connection = nil + } + @connection.disconnect(reply_code, reply_text, &shim) + end + end + + # Indicates that default connection is closing. + # + # @return [Boolean] + # @api public + def self.closing? + @connection.closing? + end + + # @return [Boolean] Current global logging value + # @api public + def self.logging + self.settings[:logging] + end + + # @return [Boolean] Sets current global logging value + # @api public + def self.logging=(value) + self.settings[:logging] = !!value + end + + # Default connection. When you do not pass connection instance to methods like + # {Channel#initialize}, AMQP gem will use this default connection. + # + # @api public + def self.connection + @connection + end + + # "Default channel". A placeholder for apps that only want to use one channel. This channel is not global, *not* used + # under the hood by methods like {AMQP::Exchange#initialize} and only shared by exchanges/queues you decide on. + # To reiterate: this is only a conventience accessor, since many apps (especially Web apps) can get by with just one + # connection and one channel. + # + # @api public + def self.channel + @channel + end + + # A placeholder for applications that only need one channel. If you use {AMQP.start} to set up default connection, + # {AMQP.channel} is open on that connection, but can be replaced by your application. + # + # + # @see AMQP.channel + # @api public + def self.channel=(value) + @channel = value + end + + # Sets global connection object. + # @api public + def self.connection=(value) + @connection = value + end + + # Alias for {AMQP.connection} + # @deprecated + # @api public + def self.conn + warn "AMQP.conn will be removed in 1.0. Please use AMQP.connection." + @connection + end + + # Alias for {AMQP.connection=} + # @deprecated + # @api public + def self.conn=(value) + warn "AMQP.conn= will be removed in 1.0. Please use AMQP.connection=(connection)." + self.connection = value + end + + # Connects to AMQP broker and yields connection object to the block as soon + # as connection is considered open. + # + # + # @example Using AMQP.connect with default connection settings + # + # AMQP.connect do |connection| + # AMQP::Channel.new(connection) do |channel| + # # channel is ready: set up your messaging flow by creating exchanges, + # # queues, binding them together and so on. + # end + # end + # + # @example Using AMQP.connect to connect to a public RabbitMQ instance with connection settings given as a hash + # + # AMQP.connect(:host => "dev.rabbitmq.com", :username => "guest", :password => "guest") do |connection| + # AMQP::Channel.new(connection) do |channel| + # # ... + # end + # end + # + # + # @example Using AMQP.connect to connect to a public RabbitMQ instance with connection settings given as a URI + # + # AMQP.connect "amqp://guest:guest@dev.rabbitmq.com:5672", :on_possible_authentication_failure => Proc.new { puts("Looks like authentication has failed") } do |connection| + # AMQP::Channel.new(connection) do |channel| + # # ... + # end + # end + # + # + # @overload connect(connection_string, options = {}) + # Used to pass connection parameters as a connection string + # @param [String] :connection_string AMQP connection URI, à la JDBC connection string. For example: amqp://bus.megacorp.internal:5877/qa + # + # + # @overload connect(connection_options) + # Used to pass connection options as a Hash. + # @param [Hash] :connection_options AMQP connection options (:host, :port, :username, :vhost, :password) + # + # @option connection_options_or_string [String] :host ("localhost") Host to connect to. + # @option connection_options_or_string [Integer] :port (5672) Port to connect to. + # @option connection_options_or_string [String] :vhost ("/") Virtual host to connect to. + # @option connection_options_or_string [String] :username ("guest") Username to use. Also can be specified as :user. + # @option connection_options_or_string [String] :password ("guest") Password to use. Also can be specified as :pass. + # @option connection_options_or_string [Hash] :ssl TLS (SSL) parameters to use. + # @option connection_options_or_string [Fixnum] :heartbeat (0) Connection heartbeat, in seconds. 0 means no heartbeat. Can also be configured server-side starting with RabbitMQ 3.0. + # @option connection_options_or_string [#call] :on_tcp_connection_failure A callable object that will be run if connection to server fails + # @option connection_options_or_string [#call] :on_possible_authentication_failure A callable object that will be run if authentication fails (see Authentication failure section) + # + # + # h2. Handling authentication failures + # + # AMQP 0.9.1 specification dictates that broker closes TCP connection when it detects that authentication + # has failed. However, broker does exactly the same thing when other connection-level exception occurs + # so there is no way to guarantee that connection was closed because of authentication failure. + # + # Because of that, AMQP gem follows Java client example and hints at _possibility_ of authentication failure. + # To handle it, pass a callable object (a proc, a lambda, an instance of a class that responds to #call) + # with :on_possible_authentication_failure option. + # + # @note This method assumes that EventMachine even loop is already running. If it is not the case or you are not sure, we recommend you use {AMQP.start} instead. + # It takes exactly the same parameters. + # @return [AMQP::Session] + # @api public + def self.connect(connection_options_or_string = {}, other_options = {}, &block) + opts = case connection_options_or_string + when String then + AMQP::Settings.parse_connection_uri(connection_options_or_string) + when Hash then + connection_options_or_string + else + Hash.new + end + + AMQP::Session.connect(opts.merge(other_options), &block) + end + + # @return [Hash] Default AMQP connection settings. This hash may be modified. + # @api public + def self.settings + @settings ||= AMQP::Settings.default + end +end # AMQP