lib/amqp/session.rb in amqp-1.7.0 vs lib/amqp/session.rb in amqp-1.8.0

- old
+ new

@@ -5,10 +5,11 @@ require "amqp/auth_mechanism_adapter" require "amqp/broker" require "amqp/channel" require "amqp/channel_id_allocator" +require "amq/settings" module AMQP # AMQP session represents connection to the broker. Session objects let you define callbacks for # various TCP connection lifecycle events, for instance: # @@ -60,41 +61,26 @@ attr_reader :callbacks # The locale defines the language in which the server will send reply texts. # - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.2) attr_accessor :locale # Client capabilities # - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2.1) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.2.1) attr_accessor :client_properties - # Server properties - # - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3) - attr_reader :server_properties - - # Server capabilities - # - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3) - attr_reader :server_capabilities - - # Locales server supports - # - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3) - attr_reader :server_locales - # Authentication mechanism used. # - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.2) attr_reader :mechanism # Authentication mechanisms broker supports. # - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.2) attr_reader :server_authentication_mechanisms # Channels within this connection. # # @see http://bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2.5) @@ -102,26 +88,27 @@ # Maximum channel number that the server permits this connection to use. # Usable channel numbers are in the range 1..channel_max. # Zero indicates no specified limit. # - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.1 and 1.4.2.6.1) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Sections 1.4.2.5.1 and 1.4.2.6.1) attr_accessor :channel_max # Maximum frame size that the server permits this connection to use. # - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.2 and 1.4.2.6.2) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Sections 1.4.2.5.2 and 1.4.2.6.2) attr_accessor :frame_max + attr_accessor :connection_timeout attr_reader :known_hosts class << self # Settings def settings - @settings ||= AMQP::Settings.default + @settings ||= AMQ::Settings.default end def logger @logger ||= begin require "logger" @@ -154,10 +141,13 @@ # @group Connecting, reconnecting, disconnecting def initialize(*args, &block) super(*args) + connection_options_or_string = args.first + other_options = args[1] || {} + self.logger = self.class.logger # channel => collected frames. MK. @frames = Hash.new { Array.new } @channels = Hash.new @@ -168,13 +158,11 @@ # track TCP connection state, used to detect initial TCP connection failures. @tcp_connection_established = false @tcp_connection_failed = false @intentionally_closing_connection = false - # EventMachine::Connection's and Adapter's constructors arity - # make it easier to use *args. MK. - @settings = Settings.configure(args.first) + @settings = AMQ::Settings.configure(connection_options_or_string).merge(other_options) @on_tcp_connection_failure = Proc.new { |settings| closed! if cb = @settings[:on_tcp_connection_failure] cb.call(settings) @@ -184,20 +172,21 @@ } @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings| raise self.class.authentication_failure_exception_class.new(settings) } - - @mechanism = @settings.fetch(:auth_mechanism, "PLAIN") + @mechanism = normalize_auth_mechanism(@settings.fetch(:auth_mechanism, "PLAIN")) @locale = @settings.fetch(:locale, "en_GB") @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new)) @auto_recovery = (!!@settings[:auto_recovery]) + @connection_timeout = (@settings[:timeout] || @settings[:connection_timeout] || 3).to_f + self.reset - self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION) - end # initialize(*args, &block) + self.set_pending_connect_timeout(@connection_timeout) unless defined?(JRUBY_VERSION) + end # initialize # @return [Boolean] true if this AMQP connection is currently open # @api plugin def connected? self.opened? @@ -229,11 +218,11 @@ end # username alias user username # Properly close connection with AMQ broker, as described in - # section 2.2.4 of the {http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification}. + # section 2.2.4 of the {https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf AMQP 0.9.1 specification}. # # @api plugin # @see #close_connection def disconnect(reply_code = 200, reply_text = "Goodbye", &block) @intentionally_closing_connection = true @@ -261,23 +250,23 @@ # @group Broker information # Server properties (product information, platform, et cetera) # # @return [Hash] - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3) attr_reader :server_properties # Server capabilities (usually used to detect AMQP 0.9.1 extensions like basic.nack, publisher # confirms and so on) # # @return [Hash] - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3) attr_reader :server_capabilities # Locales server supports # - # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3) attr_reader :server_locales # @return [AMQP::Broker] Broker this connection is established with def broker @broker ||= AMQP::Broker.new(@server_properties) @@ -449,12 +438,13 @@ # @option settings [String] :timeout (nil) Connection timeout. # @option settings [Fixnum] :heartbeat (0) Connection heartbeat, in seconds. # @option settings [Fixnum] :frame_max (131072) Maximum frame size to use. If broker cannot support frames this large, broker's maximum value will be used instead. # # @param [Hash] settings - def self.connect(settings = {}, &block) - @settings = Settings.configure(settings) + # def self.connect(settings = {}, &block) + def self.connect(connection_string_or_opts = ENV['RABBITMQ_URL'], other_options = {}, &block) + @settings = AMQ::Settings.configure(connection_string_or_opts).merge(other_options) instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings) instance.register_connection_callback(&block) instance @@ -483,18 +473,11 @@ # Similar to #reconnect, but uses different connection settings # @see #reconnect # @api public def reconnect_to(connection_string_or_options, period = 5) - settings = case connection_string_or_options - when String then - AMQP.parse_connection_uri(connection_string_or_options) - when Hash then - connection_string_or_options - else - Hash.new - end + settings = AMQ::Settings.configure(connection_string_or_opts) if !@reconnecting @reconnecting = true self.reset end @@ -761,11 +744,11 @@ # Returns heartbeat interval this client uses, in seconds. # This value may or may not be used depending on broker capabilities. # Zero means the server does not want a heartbeat. # # @return [Fixnum] Heartbeat interval this client uses, in seconds. - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.6) def heartbeat_interval @heartbeat_interval end # heartbeat_interval # Returns true if heartbeats are enabled (heartbeat interval is greater than 0) @@ -892,11 +875,11 @@ # Sends connection.open to the server. # # @api plugin - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.7) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.7) def open(vhost = "/") self.send_frame(AMQ::Protocol::Connection::Open.encode(vhost)) end # Resets connection state. @@ -993,11 +976,11 @@ # Handles connection.start. # # @api plugin - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.1.) def handle_start(connection_start) @server_properties = connection_start.server_properties @server_capabilities = @server_properties["capabilities"] @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ") @@ -1016,11 +999,11 @@ # Handles Connection.Tune-Ok. # # @api plugin - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.6) def handle_tune(connection_tune) @channel_max = connection_tune.channel_max.freeze @frame_max = connection_tune.frame_max.freeze client_heartbeat = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0 @@ -1033,11 +1016,11 @@ # Handles Connection.Open-Ok. # # @api plugin - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.8.) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.8.) def handle_open_ok(open_ok) @known_hosts = open_ok.known_hosts.dup.freeze opened! self.connection_successful if self.respond_to?(:connection_successful) @@ -1045,11 +1028,11 @@ # Handles connection.close. When broker detects a connection level exception, this method is called. # # @api plugin - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.5.2.9) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.5.2.9) def handle_close(conn_close) closed! # getting connection.close during connection negotiation means authentication # has failed (RabbitMQ 3.2+): # http://www.rabbitmq.com/auth-notification.html @@ -1061,11 +1044,11 @@ # Handles Connection.Close-Ok. # # @api plugin - # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.10) + # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.10) def handle_close_ok(close_ok) closed! self.disconnection_successful end # handle_close_ok(close_ok) @@ -1183,7 +1166,20 @@ start_tls(tls_options) elsif tls_options start_tls end end # upgrade_to_tls_if_necessary + + private + + def normalize_auth_mechanism(value) + case value + when [] then + "PLAIN" + when nil then + "PLAIN" + else + value + end + end end # Session end # AMQP