lib/amqp/session.rb in amqp-1.1.0.pre1 vs lib/amqp/session.rb in amqp-1.1.0.pre2
- old
+ new
@@ -1,10 +1,14 @@
# encoding: utf-8
-require "amq/client/adapters/event_machine"
+require "eventmachine"
+require "amqp/framing/string/frame"
+require "amqp/auth_mechanism_adapter"
require "amqp/broker"
+require "amqp/channel"
+
module AMQP
# AMQP session represents connection to the broker. Session objects let you define callbacks for
# various TCP connection lifecycle events, for instance:
#
# * Connection is established
@@ -26,27 +30,184 @@
# * {Session#reconnect}
# * {Session#connected?}
#
#
# @api public
- class Session < AMQ::Client::EventMachineClient
+ class Session < EM::Connection
+
#
+ # Behaviours
+ #
+
+ include Openable
+ include Callbacks
+
+ extend ProtocolMethodHandlers
+ extend RegisterEntityMixin
+
+
+ register_entity :channel, AMQP::Channel
+
+ #
# API
#
+ attr_accessor :logger
+ attr_accessor :settings
+
+ # @return [Array<#call>]
+ attr_reader :callbacks
+
+
+ # The locale defines the language in which the server will send reply texts.
+ #
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
+ attr_accessor :locale
+
+ # Client capabilities
+ #
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2.1)
+ attr_accessor :client_properties
+
+ # Server properties
+ #
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
+ attr_reader :server_properties
+
+ # Server capabilities
+ #
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
+ attr_reader :server_capabilities
+
+ # Locales server supports
+ #
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.3)
+ attr_reader :server_locales
+
+ # Authentication mechanism used.
+ #
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
+ attr_reader :mechanism
+
+ # Authentication mechanisms broker supports.
+ #
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.2)
+ attr_reader :server_authentication_mechanisms
+
+ # Channels within this connection.
+ #
+ # @see http://bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2.5)
+ attr_reader :channels
+
+ # Maximum channel number that the server permits this connection to use.
+ # Usable channel numbers are in the range 1..channel_max.
+ # Zero indicates no specified limit.
+ #
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.1 and 1.4.2.6.1)
+ attr_accessor :channel_max
+
+ # Maximum frame size that the server permits this connection to use.
+ #
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Sections 1.4.2.5.2 and 1.4.2.6.2)
+ attr_accessor :frame_max
+
+
+ attr_reader :known_hosts
+
+
+ class << self
+ # Settings
+ def settings
+ @settings ||= AMQP::Settings.default
+ end
+
+ def logger
+ @logger ||= begin
+ require "logger"
+ Logger.new(STDERR)
+ end
+ end
+
+ def logger=(logger)
+ methods = AMQP::Logging::REQUIRED_METHODS
+ unless methods.all? { |method| logger.respond_to?(method) }
+ raise AMQP::Logging::IncompatibleLoggerError.new(methods)
+ end
+
+ @logger = logger
+ end
+
+ # @return [Boolean] Current value of logging flag.
+ def logging
+ settings[:logging]
+ end
+
+ # Turns loggin on or off.
+ def logging=(boolean)
+ settings[:logging] = boolean
+ end
+
+
+ # Establishes connection to AMQ broker and returns it. New connection object is yielded to
+ # the block if it is given.
+ #
+ # @example Specifying adapter via the :adapter option
+ # AMQP::Adapter.connect(:adapter => "socket")
+ # @example Specifying using custom adapter class
+ # AMQP::SocketClient.connect
+ # @param [Hash] Connection parameters, including :adapter to use.
+ # @api public
+ def connect(settings = nil, &block)
+ @settings = Settings.configure(settings)
+
+ instance = self.new
+ instance.establish_connection(settings)
+ instance.register_connection_callback(&block)
+
+ instance
+ end
+ end
+
+
# @group Connecting, reconnecting, disconnecting
def initialize(*args, &block)
- super(*args, &block)
+ super(*args)
- @client_properties.merge!({
- :platform => ::RUBY_DESCRIPTION,
- :product => "AMQP gem",
- :information => "http://github.com/ruby-amqp/amqp",
- :version => AMQP::VERSION
- })
+ self.logger = self.class.logger
+
+ # channel => collected frames. MK.
+ @frames = Hash.new { Array.new }
+ @channels = Hash.new
+ @callbacks = Hash.new
+
+ opening!
+
+ # track TCP connection state, used to detect initial TCP connection failures.
+ @tcp_connection_established = false
+ @tcp_connection_failed = false
+ @intentionally_closing_connection = false
+
+ # EventMachine::Connection's and Adapter's constructors arity
+ # make it easier to use *args. MK.
+ @settings = Settings.configure(args.first)
+ @on_tcp_connection_failure = @settings[:on_tcp_connection_failure] || Proc.new { |settings|
+ raise self.class.tcp_connection_failure_exception_class.new(settings)
+ }
+ @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
+ raise self.class.authentication_failure_exception_class.new(settings)
+ }
+
+ @mechanism = @settings.fetch(:auth_mechanism, "PLAIN")
+ @locale = @settings.fetch(:locale, "en_GB")
+ @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))
+
+ @auto_recovery = (!!@settings[:auto_recovery])
+
+ self.reset
+ self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)
end # initialize(*args, &block)
# @return [Boolean] true if this AMQP connection is currently open
# @api plugin
def connected?
@@ -78,47 +239,31 @@
@settings[:user]
end # username
alias user username
- # Reconnect to the broker using current connection settings.
- #
- # @param [Boolean] force Enforce immediate connection
- # @param [Fixnum] period If given, reconnection will be delayed by this period, in seconds.
- # @api public
- def reconnect(force = false, period = 2)
- # we do this to make sure this method shows up in our documentation
- # this method is too important to leave out and YARD currently does not
- # support cross-referencing to dependencies. MK.
- super(force, period)
- end # reconnect(force = false)
-
- # A version of #reconnect that allows connecting to different endpoints (hosts).
- # @see #reconnect
- # @api public
- def reconnect_to(connection_string_or_options = {}, period = 2)
- opts = case connection_string_or_options
- when String then
- AMQP::Client.parse_connection_uri(connection_string_or_options)
- when Hash then
- connection_string_or_options
- else
- Hash.new
- end
-
- super(opts, period)
- end # reconnect_to(connection_string_or_options = {})
-
-
# Properly close connection with AMQ broker, as described in
# section 2.2.4 of the {http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification}.
#
# @api plugin
# @see #close_connection
def disconnect(reply_code = 200, reply_text = "Goodbye", &block)
- # defined here to make this method appear in YARD documentation. MK.
- super(reply_code, reply_text, &block)
+ @intentionally_closing_connection = true
+ self.on_disconnection do
+ @frames.clear
+ block.call if block
+ end
+
+ # ruby-amqp/amqp#66, MK.
+ if self.open?
+ closing!
+ self.send_frame(AMQ::Protocol::Connection::Close.encode(reply_code, reply_text, 0, 0))
+ elsif self.closing?
+ # no-op
+ else
+ self.disconnection_successful
+ end
end
alias close disconnect
# @endgroup
@@ -158,117 +303,107 @@
# size and authentication succeeds. You can define more than one callback.
#
# @see #on_closed
# @api public
def on_open(&block)
- # defined here to make this method appear in YARD documentation. MK.
- super(&block)
- end # on_open(&block)
+ @connection_deferrable.callback(&block)
+ end
+ alias on_connection on_open
# @group Error Handling and Recovery
# Defines a callback that will be run when broker confirms connection termination
# (client receives connection.close-ok). You can define more than one callback.
#
# @see #on_closed
# @api public
def on_closed(&block)
- # defined here to make this method appear in YARD documentation. MK.
- super(&block)
- end # on_closed(&block)
+ @disconnection_deferrable.callback(&block)
+ end
+ alias on_disconnection on_closed
# Defines a callback that will be run when initial TCP connection fails.
# You can define only one callback.
#
# @api public
def on_tcp_connection_failure(&block)
- # defined here to make this method appear in YARD documentation. MK.
- super(&block)
+ @on_tcp_connection_failure = block
end
# Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted).
# You can define only one callback.
#
# @api public
def on_tcp_connection_loss(&block)
- # defined here to make this method appear in YARD documentation. MK.
- super(&block)
+ @on_tcp_connection_loss = block
end
# Defines a callback that will be run when TCP connection is closed before authentication
# finishes. Usually this means authentication failure. You can define only one callback.
#
# @api public
def on_possible_authentication_failure(&block)
- # defined here to make this method appear in YARD documentation. MK.
- super(&block)
+ @on_possible_authentication_failure = block
end
# Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def on_connection_interruption(&block)
- super(&block)
- end # on_connection_interruption(&block)
+ self.redefine_callback(:after_connection_interruption, &block)
+ end
alias after_connection_interruption on_connection_interruption
- # @private
- # @api plugin
- def handle_connection_interruption
- super
- end # handle_connection_interruption
-
-
# Defines a callback that will be executed when connection is closed after
# connection-level exception. Only one callback can be defined (the one defined last
# replaces previously added ones).
#
# @api public
def on_error(&block)
- super(&block)
+ self.redefine_callback(:error, &block)
end
# Defines a callback that will be executed after TCP connection has recovered after a network failure
# but before AMQP connection is re-opened.
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def before_recovery(&block)
- super(&block)
- end # before_recovery(&block)
+ self.redefine_callback(:before_recovery, &block)
+ end
# Defines a callback that will be executed after AMQP connection has recovered after a network failure..
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def on_recovery(&block)
- super(&block)
- end # on_recovery(&block)
+ self.redefine_callback(:after_recovery, &block)
+ end
alias after_recovery on_recovery
# @return [Boolean] whether connection is in the automatic recovery mode
# @api public
def auto_recovering?
- super
- end # auto_recovering?
+ !!@auto_recovery
+ end
alias auto_recovery? auto_recovering?
# Performs recovery of channels that are in the automatic recovery mode.
#
# @see Channel#auto_recover
# @see Queue#auto_recover
# @see Exchange#auto_recover
# @api plugin
def auto_recover
- super
+ @channels.select { |channel_id, ch| ch.auto_recovering? }.each { |n, ch| ch.auto_recover }
end # auto_recover
# @endgroup
@@ -295,7 +430,745 @@
# @api plugin
# @return [Class] AMQP::PossibleAuthenticationFailureError
def self.authentication_failure_exception_class
@authentication_failure_exception_class ||= AMQP::PossibleAuthenticationFailureError
end # self.authentication_failure_exception_class
+
+ # @group Connection operations
+
+ # Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection
+ # succeeds.
+ #
+ # @option settings [String] :host ("127.0.0.1") Hostname AMQ broker runs on.
+ # @option settings [String] :port (5672) Port AMQ broker listens on.
+ # @option settings [String] :vhost ("/") Virtual host to use.
+ # @option settings [String] :user ("guest") Username to use for authentication.
+ # @option settings [String] :pass ("guest") Password to use for authentication.
+ # @option settings [String] :auth_mechanism ("PLAIN") SASL authentication mechanism to use.
+ # @option settings [String] :ssl (false) Should be use TLS (SSL) for connection?
+ # @option settings [String] :timeout (nil) Connection timeout.
+ # @option settings [Fixnum] :heartbeat (0) Connection heartbeat, in seconds.
+ # @option settings [Fixnum] :frame_max (131072) Maximum frame size to use. If broker cannot support frames this large, broker's maximum value will be used instead.
+ #
+ # @param [Hash] settings
+ def self.connect(settings = {}, &block)
+ @settings = Settings.configure(settings)
+
+ instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings)
+ instance.register_connection_callback(&block)
+
+ instance
+ end
+
+ # Reconnect after a period of wait.
+ #
+ # @param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt.
+ # @param [Boolean] force If true, enforces immediate reconnection.
+ # @api public
+ def reconnect(force = false, period = 5)
+ if @reconnecting and not force
+ EventMachine::Timer.new(period) {
+ reconnect(true, period)
+ }
+ return
+ end
+
+ if !@reconnecting
+ @reconnecting = true
+ self.reset
+ end
+
+ EventMachine.reconnect(@settings[:host], @settings[:port], self)
+ end
+
+ # Similar to #reconnect, but uses different connection settings
+ # @see #reconnect
+ # @api public
+ def reconnect_to(connection_string_or_options, period = 5)
+ settings = case connection_string_or_options
+ when String then
+ AMQP.parse_connection_uri(connection_string_or_options)
+ when Hash then
+ connection_string_or_options
+ else
+ Hash.new
+ end
+
+ if !@reconnecting
+ @reconnecting = true
+ self.reset
+ end
+
+ @settings = Settings.configure(settings)
+ EventMachine.reconnect(@settings[:host], @settings[:port], self)
+ end
+
+
+ # Periodically try to reconnect.
+ #
+ # @param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt.
+ # @param [Boolean] force If true, enforces immediate reconnection.
+ # @api public
+ def periodically_reconnect(period = 5)
+ @reconnecting = true
+ self.reset
+
+ @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) {
+ EventMachine.reconnect(@settings[:host], @settings[:port], self)
+ }
+ end
+
+ # @endgroup
+
+ # @see #on_open
+ # @private
+ def register_connection_callback(&block)
+ unless block.nil?
+ # delay calling block we were given till after we receive
+ # connection.open-ok. Connection will notify us when
+ # that happens.
+ self.on_open do
+ block.call(self)
+ end
+ end
+ end
+
+
+
+ # For EventMachine adapter, this is a no-op.
+ # @api public
+ def establish_connection(settings)
+ # Unfortunately there doesn't seem to be any sane way
+ # how to get EventMachine connect to the instance level.
+ end
+
+ alias close disconnect
+
+
+
+ # Whether we are in authentication state (after TCP connection was estabilished
+ # but before broker authenticated us).
+ #
+ # @return [Boolean]
+ # @api public
+ def authenticating?
+ @authenticating
+ end # authenticating?
+
+ # IS TCP connection estabilished and currently active?
+ # @return [Boolean]
+ # @api public
+ def tcp_connection_established?
+ @tcp_connection_established
+ end # tcp_connection_established?
+
+
+
+
+ #
+ # Implementation
+ #
+
+ # Backwards compatibility with 0.7.0.a25. MK.
+ Deferrable = EventMachine::DefaultDeferrable
+
+
+ alias send_raw send_data
+
+
+ # EventMachine reactor callback. Is run when TCP connection is estabilished
+ # but before resumption of the network loop. Note that this includes cases
+ # when TCP connection has failed.
+ # @private
+ def post_init
+ reset
+
+ # note that upgrading to TLS in #connection_completed causes
+ # Erlang SSL app that RabbitMQ relies on to report
+ # error on TCP connection <0.1465.0>:{ssl_upgrade_error,"record overflow"}
+ # and close TCP connection down. Investigation of this issue is likely
+ # to take some time and to not be worth in as long as #post_init
+ # works fine. MK.
+ upgrade_to_tls_if_necessary
+ rescue Exception => error
+ raise error
+ end # post_init
+
+
+
+ # Called by EventMachine reactor once TCP connection is successfully estabilished.
+ # @private
+ def connection_completed
+ # we only can safely set this value here because EventMachine is a lovely piece of
+ # software that calls #post_init before #unbind even when TCP connection
+ # fails. MK.
+ @tcp_connection_established = true
+ @periodic_reconnection_timer.cancel if @periodic_reconnection_timer
+
+
+ # again, this is because #unbind is called in different situations
+ # and there is no easy way to tell initial connection failure
+ # from connection loss. Not in EventMachine 0.12.x, anyway. MK.
+
+ if @had_successfully_connected_before
+ @recovered = true
+
+
+ self.start_automatic_recovery
+ self.upgrade_to_tls_if_necessary
+ end
+
+ # now we can set it. MK.
+ @had_successfully_connected_before = true
+ @reconnecting = false
+ @handling_skipped_hearbeats = false
+ @last_server_heartbeat = Time.now
+
+ self.handshake
+ end
+
+ # @private
+ def close_connection(*args)
+ @intentionally_closing_connection = true
+
+ super(*args)
+ end
+
+ # Called by EventMachine reactor when
+ #
+ # * We close TCP connection down
+ # * Our peer closes TCP connection down
+ # * There is a network connection issue
+ # * Initial TCP connection fails
+ # @private
+ def unbind(exception = nil)
+ if !@tcp_connection_established && !@had_successfully_connected_before && !@intentionally_closing_connection
+ @tcp_connection_failed = true
+ logger.error "[amqp] Detected TCP connection failure"
+ self.tcp_connection_failed
+ end
+
+ closing!
+ @tcp_connection_established = false
+
+ self.handle_connection_interruption if @reconnecting
+ @disconnection_deferrable.succeed
+
+ closed!
+
+
+ self.tcp_connection_lost if !@intentionally_closing_connection && @had_successfully_connected_before
+
+ # since AMQP spec dictates that authentication failure is a protocol exception
+ # and protocol exceptions result in connection closure, check whether we are
+ # in the authentication stage. If so, it is likely to signal an authentication
+ # issue. Java client behaves the same way. MK.
+ if authenticating? && !@intentionally_closing_connection
+ @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure
+ end
+ end # unbind
+
+
+ #
+ # EventMachine receives data in chunks, sometimes those chunks are smaller
+ # than the size of AMQP frame. That's why you need to add some kind of buffer.
+ #
+ # @private
+ def receive_data(chunk)
+ @chunk_buffer << chunk
+ while frame = get_next_frame
+ self.receive_frame(AMQP::Framing::String::Frame.decode(frame))
+ end
+ end
+
+
+ # Called by AMQP::Connection after we receive connection.open-ok.
+ # @api public
+ def connection_successful
+ @authenticating = false
+ opened!
+
+ @connection_deferrable.succeed
+ end # connection_successful
+
+
+ # Called by AMQP::Connection after we receive connection.close-ok.
+ #
+ # @api public
+ def disconnection_successful
+ @disconnection_deferrable.succeed
+
+ # true for "after writing buffered data"
+ self.close_connection(true)
+ self.reset
+ closed!
+ end # disconnection_successful
+
+ # Called when time since last server heartbeat received is greater or equal to the
+ # heartbeat interval set via :heartbeat_interval option on connection.
+ #
+ # @api plugin
+ def handle_skipped_hearbeats
+ if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection
+ @handling_skipped_hearbeats = true
+ self.cancel_heartbeat_sender
+
+ self.run_skipped_heartbeats_callbacks
+ end
+ end
+
+ # @private
+ def initialize_heartbeat_sender
+ @last_server_heartbeat = Time.now
+ @heartbeats_timer = EventMachine::PeriodicTimer.new(self.heartbeat_interval, &method(:send_heartbeat))
+ end
+
+ # @private
+ def cancel_heartbeat_sender
+ @heartbeats_timer.cancel if @heartbeats_timer
+ end
+
+
+
+ # Sends AMQ protocol header (also known as preamble).
+ #
+ # @note This must be implemented by all AMQP clients.
+ # @api plugin
+ # @see http://bit.ly/amqp091spec AMQP 0.9.1 specification (Section 2.2)
+ def send_preamble
+ self.send_raw(AMQ::Protocol::PREAMBLE)
+ end
+
+ # Sends frame to the peer, checking that connection is open.
+ #
+ # @raise [ConnectionClosedError]
+ def send_frame(frame)
+ if closed?
+ raise ConnectionClosedError.new(frame)
+ else
+ self.send_raw(frame.encode)
+ end
+ end
+
+ # Sends multiple frames, one by one. For thread safety this method takes a channel
+ # object and synchronizes on it.
+ #
+ # @api public
+ def send_frameset(frames, channel)
+ # some (many) developers end up sharing channels between threads and when multiple
+ # threads publish on the same channel aggressively, at some point frames will be
+ # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
+ # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
+ # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
+ channel.synchronize do
+ frames.each { |frame| self.send_frame(frame) }
+ end
+ end # send_frameset(frames)
+
+
+
+ # Returns heartbeat interval this client uses, in seconds.
+ # This value may or may not be used depending on broker capabilities.
+ # Zero means the server does not want a heartbeat.
+ #
+ # @return [Fixnum] Heartbeat interval this client uses, in seconds.
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)
+ def heartbeat_interval
+ @heartbeat_interval
+ end # heartbeat_interval
+
+ # Returns true if heartbeats are enabled (heartbeat interval is greater than 0)
+ # @return [Boolean]
+ def heartbeats_enabled?
+ @heartbeat_interval && (@heartbeat_interval > 0)
+ end
+
+
+ # vhost this connection uses. Default is "/", a historically estabilished convention
+ # of RabbitMQ and amqp gem.
+ #
+ # @return [String] vhost this connection uses
+ # @api public
+ def vhost
+ @settings.fetch(:vhost, "/")
+ end # vhost
+
+
+
+ # @group Error Handling and Recovery
+
+ # Called when initial TCP connection fails.
+ # @api public
+ def tcp_connection_failed
+ @recovered = false
+
+ @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure
+ end
+
+ # Called when previously established TCP connection fails.
+ # @api public
+ def tcp_connection_lost
+ @recovered = false
+
+ @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss
+ self.handle_connection_interruption
+ end
+
+ # @return [Boolean]
+ def reconnecting?
+ @reconnecting
+ end # reconnecting?
+
+ # @private
+ # @api plugin
+ def handle_connection_interruption
+ self.cancel_heartbeat_sender
+
+ @channels.each { |n, c| c.handle_connection_interruption }
+ self.exec_callback_yielding_self(:after_connection_interruption)
+ end
+
+
+
+ # @private
+ def run_before_recovery_callbacks
+ self.exec_callback_yielding_self(:before_recovery, @settings)
+
+ @channels.each { |n, ch| ch.run_before_recovery_callbacks }
+ end
+
+
+ # @private
+ def run_after_recovery_callbacks
+ self.exec_callback_yielding_self(:after_recovery, @settings)
+
+ @channels.each { |n, ch| ch.run_after_recovery_callbacks }
+ end
+
+
+ # Performs recovery of channels that are in the automatic recovery mode. "before recovery" callbacks
+ # are run immediately, "after recovery" callbacks are run after AMQP connection is re-established and
+ # auto recovery is performed (using #auto_recover).
+ #
+ # Use this method if you want to run automatic recovery process after handling a connection-level exception,
+ # for example, 320 CONNECTION_FORCED (used by RabbitMQ when it is shut down gracefully).
+ #
+ # @see Channel#auto_recover
+ # @see Queue#auto_recover
+ # @see Exchange#auto_recover
+ # @api plugin
+ def start_automatic_recovery
+ self.run_before_recovery_callbacks
+ self.register_connection_callback do
+ # always run automatic recovery, because it is per-channel
+ # and connection has to start it. Channels that did not opt-in for
+ # autorecovery won't be selected. MK.
+ self.auto_recover
+ self.run_after_recovery_callbacks
+ end
+ end # start_automatic_recovery
+
+
+ # Defines a callback that will be executed after time since last broker heartbeat is greater
+ # than or equal to the heartbeat interval (skipped heartbeat is detected).
+ # Only one callback can be defined (the one defined last replaces previously added ones).
+ #
+ # @api public
+ def on_skipped_heartbeats(&block)
+ self.redefine_callback(:skipped_heartbeats, &block)
+ end # on_skipped_heartbeats(&block)
+
+ # @private
+ def run_skipped_heartbeats_callbacks
+ self.exec_callback_yielding_self(:skipped_heartbeats, @settings)
+ end
+
+ # @endgroup
+
+
+
+
+ #
+ # Implementation
+ #
+
+ # Sends connection preamble to the broker.
+ # @api plugin
+ def handshake
+ @authenticating = true
+ self.send_preamble
+ end
+
+
+ # Sends connection.open to the server.
+ #
+ # @api plugin
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.7)
+ def open(vhost = "/")
+ self.send_frame(AMQ::Protocol::Connection::Open.encode(vhost))
+ end
+
+ # Resets connection state.
+ #
+ # @api plugin
+ def reset_state!
+ # no-op by default
+ end # reset_state!
+
+ # @api plugin
+ # @see http://tools.ietf.org/rfc/rfc2595.txt RFC 2595
+ def encode_credentials(username, password)
+ auth_mechanism_adapter.encode_credentials(username, password)
+ end # encode_credentials(username, password)
+
+ # Retrieves an AuthMechanismAdapter that will encode credentials for
+ # this Adapter.
+ #
+ # @api plugin
+ def auth_mechanism_adapter
+ @auth_mechanism_adapter ||= AuthMechanismAdapter.for_adapter(self)
+ end
+
+
+ # Processes a single frame.
+ #
+ # @param [AMQ::Protocol::Frame] frame
+ # @api plugin
+ def receive_frame(frame)
+ @frames[frame.channel] ||= Array.new
+ @frames[frame.channel] << frame
+
+ if frameset_complete?(@frames[frame.channel])
+ receive_frameset(@frames[frame.channel])
+ # for channel.close, frame.channel will be nil. MK.
+ clear_frames_on(frame.channel) if @frames[frame.channel]
+ end
+ end
+
+ # Processes a frameset by finding and invoking a suitable handler.
+ # Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat
+ # value.
+ #
+ # @param [Array<AMQ::Protocol::Frame>] frames
+ # @api plugin
+ def receive_frameset(frames)
+ if self.heartbeats_enabled?
+ # treat incoming traffic as heartbeats.
+ # this operation is pretty expensive under heavy traffic but heartbeats can be disabled
+ # (and are also disabled by default). MK.
+ @last_server_heartbeat = Time.now
+ end
+ frame = frames.first
+
+ if AMQ::Protocol::HeartbeatFrame === frame
+ # no-op
+ else
+ if callable = AMQP::HandlersRegistry.find(frame.method_class)
+ f = frames.shift
+ callable.call(self, f, frames)
+ else
+ raise MissingHandlerError.new(frames.first)
+ end
+ end
+ end
+
+ # Clears frames that were received but not processed on given channel. Needs to be called
+ # when the channel is closed.
+ # @private
+ def clear_frames_on(channel_id)
+ raise ArgumentError, "channel id cannot be nil!" if channel_id.nil?
+
+ @frames[channel_id].clear
+ end
+
+ # Sends a heartbeat frame if connection is open.
+ # @api plugin
+ def send_heartbeat
+ if tcp_connection_established? && !@handling_skipped_hearbeats && @last_server_heartbeat
+ if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting?
+ logger.error "[amqp] Detected missing server heartbeats"
+ self.handle_skipped_hearbeats
+ end
+ send_frame(AMQ::Protocol::HeartbeatFrame)
+ end
+ end # send_heartbeat
+
+
+
+
+
+
+ # Handles connection.start.
+ #
+ # @api plugin
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.1.)
+ def handle_start(connection_start)
+ @server_properties = connection_start.server_properties
+ @server_capabilities = @server_properties["capabilities"]
+
+ @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ")
+ @server_locales = Array(connection_start.locales)
+
+ username = @settings[:user] || @settings[:username]
+ password = @settings[:pass] || @settings[:password]
+
+ # It's not clear whether we should transition to :opening state here
+ # or in #open but in case authentication fails, it would be strange to have
+ # @status undefined. So lets do this. MK.
+ opening!
+
+ self.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, mechanism, self.encode_credentials(username, password), @locale))
+ end
+
+
+ # Handles Connection.Tune-Ok.
+ #
+ # @api plugin
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.6)
+ def handle_tune(connection_tune)
+ @channel_max = connection_tune.channel_max.freeze
+ @frame_max = connection_tune.frame_max.freeze
+
+ client_heartbeat = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0
+
+ @heartbeat_interval = negotiate_heartbeat_value(client_heartbeat, connection_tune.heartbeat)
+
+ self.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
+ self.initialize_heartbeat_sender if heartbeats_enabled?
+ end # handle_tune(method)
+
+
+ # Handles Connection.Open-Ok.
+ #
+ # @api plugin
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.8.)
+ def handle_open_ok(open_ok)
+ @known_hosts = open_ok.known_hosts.dup.freeze
+
+ opened!
+ self.connection_successful if self.respond_to?(:connection_successful)
+ end
+
+
+ # Handles connection.close. When broker detects a connection level exception, this method is called.
+ #
+ # @api plugin
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.5.2.9)
+ def handle_close(conn_close)
+ closed!
+ self.exec_callback_yielding_self(:error, conn_close)
+ end
+
+
+ # Handles Connection.Close-Ok.
+ #
+ # @api plugin
+ # @see http://bit.ly/amqp091reference AMQP 0.9.1 protocol reference (Section 1.4.2.10)
+ def handle_close_ok(close_ok)
+ closed!
+ self.disconnection_successful
+ end # handle_close_ok(close_ok)
+
+
+
+ protected
+
+ def negotiate_heartbeat_value(client_value, server_value)
+ if client_value == 0 || server_value == 0
+ [client_value, server_value].max
+ else
+ [client_value, server_value].min
+ end
+ end
+
+ # Returns next frame from buffer whenever possible
+ #
+ # @api private
+ def get_next_frame
+ return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length
+ # octet + short
+ offset = 3 # 1 + 2
+ # length
+ payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first
+ # 4 bytes for long payload length, 1 byte final octet
+ frame_length = offset + payload_length + 5
+ if frame_length <= @chunk_buffer.size
+ @chunk_buffer.slice!(0, frame_length)
+ else
+ nil
+ end
+ end # def get_next_frame
+
+ # Utility methods
+
+ # Determines, whether the received frameset is ready to be further processed
+ def frameset_complete?(frames)
+ return false if frames.empty?
+ first_frame = frames[0]
+ first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1]))
+ end
+
+ # Determines, whether given frame array contains full content body
+ def content_complete?(frames)
+ return false if frames.empty?
+ header = frames[0]
+ raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame)
+ header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size }
+ end
+
+
+
+ self.handle(AMQ::Protocol::Connection::Start) do |connection, frame|
+ connection.handle_start(frame.decode_payload)
+ end
+
+ self.handle(AMQ::Protocol::Connection::Tune) do |connection, frame|
+ connection.handle_tune(frame.decode_payload)
+
+ connection.open(connection.vhost)
+ end
+
+ self.handle(AMQ::Protocol::Connection::OpenOk) do |connection, frame|
+ connection.handle_open_ok(frame.decode_payload)
+ end
+
+ self.handle(AMQ::Protocol::Connection::Close) do |connection, frame|
+ connection.handle_close(frame.decode_payload)
+ end
+
+ self.handle(AMQ::Protocol::Connection::CloseOk) do |connection, frame|
+ connection.handle_close_ok(frame.decode_payload)
+ end
+
+
+
+
+ protected
+
+
+ def reset
+ @size = 0
+ @payload = ""
+ @frames = Array.new
+
+ @chunk_buffer = ""
+ @connection_deferrable = EventMachine::DefaultDeferrable.new
+ @disconnection_deferrable = EventMachine::DefaultDeferrable.new
+
+ # used to track down whether authentication succeeded. AMQP 0.9.1 dictates
+ # that on authentication failure broker must close TCP connection without sending
+ # any more data. This is why we need to explicitly track whether we are past
+ # authentication stage to signal possible authentication failures.
+ @authenticating = false
+ end
+
+ def upgrade_to_tls_if_necessary
+ tls_options = @settings[:ssl]
+
+ if tls_options.is_a?(Hash)
+ start_tls(tls_options)
+ elsif tls_options
+ start_tls
+ end
+ end # upgrade_to_tls_if_necessary
end # Session
end # AMQP