lib/amqp/session.rb in amqp-1.7.0 vs lib/amqp/session.rb in amqp-1.8.0
- old
+ new
@@ -5,10 +5,11 @@
require "amqp/auth_mechanism_adapter"
require "amqp/broker"
require "amqp/channel"
require "amqp/channel_id_allocator"
+require "amq/settings"
module AMQP
# AMQP session represents connection to the broker. Session objects let you define callbacks for
# various TCP connection lifecycle events, for instance:
#
@@ -60,41 +61,26 @@
attr_reader :callbacks
# The locale defines the language in which the server will send reply texts.
#
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.2)
attr_accessor :locale
# Client capabilities
#
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2.1)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.2.1)
attr_accessor :client_properties
- # Server properties
- #
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
- attr_reader :server_properties
-
- # Server capabilities
- #
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
- attr_reader :server_capabilities
-
- # Locales server supports
- #
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
- attr_reader :server_locales
-
# Authentication mechanism used.
#
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.2)
attr_reader :mechanism
# Authentication mechanisms broker supports.
#
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.2)
attr_reader :server_authentication_mechanisms
# Channels within this connection.
#
# @see http://bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2.5)
@@ -102,26 +88,27 @@
# Maximum channel number that the server permits this connection to use.
# Usable channel numbers are in the range 1..channel_max.
# Zero indicates no specified limit.
#
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.1 and 1.4.2.6.1)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Sections 1.4.2.5.1 and 1.4.2.6.1)
attr_accessor :channel_max
# Maximum frame size that the server permits this connection to use.
#
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.2 and 1.4.2.6.2)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Sections 1.4.2.5.2 and 1.4.2.6.2)
attr_accessor :frame_max
+ attr_accessor :connection_timeout
attr_reader :known_hosts
class << self
# Settings
def settings
- @settings ||= AMQP::Settings.default
+ @settings ||= AMQ::Settings.default
end
def logger
@logger ||= begin
require "logger"
@@ -154,10 +141,13 @@
# @group Connecting, reconnecting, disconnecting
def initialize(*args, &block)
super(*args)
+ connection_options_or_string = args.first
+ other_options = args[1] || {}
+
self.logger = self.class.logger
# channel => collected frames. MK.
@frames = Hash.new { Array.new }
@channels = Hash.new
@@ -168,13 +158,11 @@
# track TCP connection state, used to detect initial TCP connection failures.
@tcp_connection_established = false
@tcp_connection_failed = false
@intentionally_closing_connection = false
- # EventMachine::Connection's and Adapter's constructors arity
- # make it easier to use *args. MK.
- @settings = Settings.configure(args.first)
+ @settings = AMQ::Settings.configure(connection_options_or_string).merge(other_options)
@on_tcp_connection_failure = Proc.new { |settings|
closed!
if cb = @settings[:on_tcp_connection_failure]
cb.call(settings)
@@ -184,20 +172,21 @@
}
@on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
raise self.class.authentication_failure_exception_class.new(settings)
}
-
- @mechanism = @settings.fetch(:auth_mechanism, "PLAIN")
+ @mechanism = normalize_auth_mechanism(@settings.fetch(:auth_mechanism, "PLAIN"))
@locale = @settings.fetch(:locale, "en_GB")
@client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))
@auto_recovery = (!!@settings[:auto_recovery])
+ @connection_timeout = (@settings[:timeout] || @settings[:connection_timeout] || 3).to_f
+
self.reset
- self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)
- end # initialize(*args, &block)
+ self.set_pending_connect_timeout(@connection_timeout) unless defined?(JRUBY_VERSION)
+ end # initialize
# @return [Boolean] true if this AMQP connection is currently open
# @api plugin
def connected?
self.opened?
@@ -229,11 +218,11 @@
end # username
alias user username
# Properly close connection with AMQ broker, as described in
- # section 2.2.4 of the {http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification}.
+ # section 2.2.4 of the {https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf AMQP 0.9.1 specification}.
#
# @api plugin
# @see #close_connection
def disconnect(reply_code = 200, reply_text = "Goodbye", &block)
@intentionally_closing_connection = true
@@ -261,23 +250,23 @@
# @group Broker information
# Server properties (product information, platform, et cetera)
#
# @return [Hash]
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3)
attr_reader :server_properties
# Server capabilities (usually used to detect AMQP 0.9.1 extensions like basic.nack, publisher
# confirms and so on)
#
# @return [Hash]
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3)
attr_reader :server_capabilities
# Locales server supports
#
- # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol documentation (Section 1.4.2.1.3)
attr_reader :server_locales
# @return [AMQP::Broker] Broker this connection is established with
def broker
@broker ||= AMQP::Broker.new(@server_properties)
@@ -449,12 +438,13 @@
# @option settings [String] :timeout (nil) Connection timeout.
# @option settings [Fixnum] :heartbeat (0) Connection heartbeat, in seconds.
# @option settings [Fixnum] :frame_max (131072) Maximum frame size to use. If broker cannot support frames this large, broker's maximum value will be used instead.
#
# @param [Hash] settings
- def self.connect(settings = {}, &block)
- @settings = Settings.configure(settings)
+ # def self.connect(settings = {}, &block)
+ def self.connect(connection_string_or_opts = ENV['RABBITMQ_URL'], other_options = {}, &block)
+ @settings = AMQ::Settings.configure(connection_string_or_opts).merge(other_options)
instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings)
instance.register_connection_callback(&block)
instance
@@ -483,18 +473,11 @@
# Similar to #reconnect, but uses different connection settings
# @see #reconnect
# @api public
def reconnect_to(connection_string_or_options, period = 5)
- settings = case connection_string_or_options
- when String then
- AMQP.parse_connection_uri(connection_string_or_options)
- when Hash then
- connection_string_or_options
- else
- Hash.new
- end
+ settings = AMQ::Settings.configure(connection_string_or_opts)
if !@reconnecting
@reconnecting = true
self.reset
end
@@ -761,11 +744,11 @@
# Returns heartbeat interval this client uses, in seconds.
# This value may or may not be used depending on broker capabilities.
# Zero means the server does not want a heartbeat.
#
# @return [Fixnum] Heartbeat interval this client uses, in seconds.
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.6)
def heartbeat_interval
@heartbeat_interval
end # heartbeat_interval
# Returns true if heartbeats are enabled (heartbeat interval is greater than 0)
@@ -892,11 +875,11 @@
# Sends connection.open to the server.
#
# @api plugin
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.7)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.7)
def open(vhost = "/")
self.send_frame(AMQ::Protocol::Connection::Open.encode(vhost))
end
# Resets connection state.
@@ -993,11 +976,11 @@
# Handles connection.start.
#
# @api plugin
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.1.)
def handle_start(connection_start)
@server_properties = connection_start.server_properties
@server_capabilities = @server_properties["capabilities"]
@server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ")
@@ -1016,11 +999,11 @@
# Handles Connection.Tune-Ok.
#
# @api plugin
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.6)
def handle_tune(connection_tune)
@channel_max = connection_tune.channel_max.freeze
@frame_max = connection_tune.frame_max.freeze
client_heartbeat = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0
@@ -1033,11 +1016,11 @@
# Handles Connection.Open-Ok.
#
# @api plugin
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.8.)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.8.)
def handle_open_ok(open_ok)
@known_hosts = open_ok.known_hosts.dup.freeze
opened!
self.connection_successful if self.respond_to?(:connection_successful)
@@ -1045,11 +1028,11 @@
# Handles connection.close. When broker detects a connection level exception, this method is called.
#
# @api plugin
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.5.2.9)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.5.2.9)
def handle_close(conn_close)
closed!
# getting connection.close during connection negotiation means authentication
# has failed (RabbitMQ 3.2+):
# http://www.rabbitmq.com/auth-notification.html
@@ -1061,11 +1044,11 @@
# Handles Connection.Close-Ok.
#
# @api plugin
- # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.10)
+ # @see https://www.rabbitmq.com/resources/specs/amqp-xml-doc0-9-1.pdf AMQP 0.9.1 protocol reference (Section 1.4.2.10)
def handle_close_ok(close_ok)
closed!
self.disconnection_successful
end # handle_close_ok(close_ok)
@@ -1183,7 +1166,20 @@
start_tls(tls_options)
elsif tls_options
start_tls
end
end # upgrade_to_tls_if_necessary
+
+ private
+
+ def normalize_auth_mechanism(value)
+ case value
+ when [] then
+ "PLAIN"
+ when nil then
+ "PLAIN"
+ else
+ value
+ end
+ end
end # Session
end # AMQP