lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.9.3 vs lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-1.0.0
- old
+ new
@@ -1,5 +1,7 @@
+require 'securerandom'
+
module Ably
module Realtime
# The Connection class represents the connection associated with an Ably Realtime instance.
# The Connection object exposes the lifecycle and parameters of the realtime connection.
#
@@ -23,12 +25,10 @@
# Connection::STATE.Suspended
# Connection::STATE.Closing
# Connection::STATE.Closed
# Connection::STATE.Failed
#
- # Connection emit errors - use `on(:error)` to subscribe to errors
- #
# @example
# client = Ably::Realtime::Client.new('key.id:secret')
# client.connection.on(:connected) do
# puts "Connected with connection ID: #{client.connection.id}"
# end
@@ -40,35 +40,45 @@
include Ably::Modules::EventEmitter
include Ably::Modules::Conversions
include Ably::Modules::SafeYield
extend Ably::Modules::Enum
- # Valid Connection states
+ # ConnectionState
+ # The permited states for this connection
STATE = ruby_enum('STATE',
:initialized,
:connecting,
:connected,
:disconnected,
:suspended,
:closing,
:closed,
:failed
)
+
+ # ConnectionEvent
+ # The permitted connection events that are emitted for this connection
+ EVENT = ruby_enum('EVENT',
+ STATE.to_sym_arr + [:update]
+ )
+
include Ably::Modules::StateEmitter
include Ably::Modules::UsesStateMachine
ensure_state_machine_emits 'Ably::Models::ConnectionStateChange'
# Expected format for a connection recover key
RECOVER_REGEX = /^(?<recover>[\w!-]+):(?<connection_serial>\-?\w+)$/
# Defaults for automatic connection recovery and timeouts
DEFAULTS = {
+ channel_retry_timeout: 15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
suspended_retry_timeout: 30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
connection_state_ttl: 120, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected
- max_connection_state_ttl: nil, # allow a max TTL to be passed in for CI test purposes thus overiding any connection_state_ttl sent from Ably
+ max_connection_state_ttl: nil, # allow a max TTL to be passed in, usually for CI test purposes thus overiding any connection_state_ttl sent from Ably
realtime_request_timeout: 10, # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage
+ websocket_heartbeats_disabled: false,
}.freeze
# A unique public identifier for this connection, used to identify this member in presence events and messages
# @return [String]
attr_reader :id
@@ -120,13 +130,13 @@
attr_reader :defaults
# @api public
def initialize(client, options)
@client = client
- @client_serial = -1
@__outgoing_message_queue__ = []
@__pending_message_ack_queue__ = []
+ reset_client_serial
@defaults = DEFAULTS.dup
options.each do |key, val|
@defaults[key] = val if DEFAULTS.has_key?(key)
end if options.kind_of?(Hash)
@@ -148,11 +158,13 @@
#
# @return [EventMachine::Deferrable]
#
def close(&success_block)
unless closing? || closed?
- raise exception_for_state_change_to(:closing) unless can_transition_to?(:closing)
+ unless can_transition_to?(:closing)
+ return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:closing))
+ end
transition_state_machine :closing
end
deferrable_for_state_change_to(STATE.Closed, &success_block)
end
@@ -168,27 +180,30 @@
#
# @return [EventMachine::Deferrable]
#
def connect(&success_block)
unless connecting? || connected?
- raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting)
- transition_state_machine :connecting
+ unless can_transition_to?(:connecting)
+ return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:connecting))
+ end
+ # If connect called in a suspended block, we want to ensure the other callbacks have finished their work first
+ EventMachine.next_tick { transition_state_machine :connecting if can_transition_to?(:connecting) }
end
Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
deferrable.callback do
yield if block_given?
end
succeed_callback = deferrable.method(:succeed)
fail_callback = deferrable.method(:fail)
- once(:connected) do
+ unsafe_once(:connected) do
deferrable.succeed
off(&fail_callback)
end
- once(:failed, :closed, :closing) do
+ unsafe_once(:failed, :closed, :closing) do
deferrable.fail
off(&succeed_callback)
end
end
end
@@ -204,43 +219,58 @@
# client = Ably::Rest::Client.new(key: 'key.id:secret')
# client.connection.ping do |elapsed_s|
# puts "Ping took #{elapsed_s}s"
# end
#
- # @return [void]
+ # @return [Ably::Util::SafeDeferrable]
#
def ping(&block)
- raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized?
- raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed?
+ if initialized? || suspended? || closing? || closed? || failed?
+ error = Ably::Models::ErrorInfo.new(message: "Cannot send a ping when the connection is #{state}", code: 80003)
+ return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
+ end
- started = nil
- finished = false
+ Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
+ started = nil
+ finished = false
+ ping_id = SecureRandom.hex(16)
+ heartbeat_action = Ably::Models::ProtocolMessage::ACTION.Heartbeat
- wait_for_ping = Proc.new do |protocol_message|
- next if finished
- if protocol_message.action == Ably::Models::ProtocolMessage::ACTION.Heartbeat
+ wait_for_ping = Proc.new do |protocol_message|
+ next if finished
+ if protocol_message.action == heartbeat_action && protocol_message.id == ping_id
+ finished = true
+ __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
+ time_passed = Time.now.to_f - started.to_f
+ deferrable.succeed time_passed
+ safe_yield block, time_passed if block_given?
+ end
+ end
+
+ once_or_if(STATE.Connected) do
+ next if finished
+ started = Time.now
+ send_protocol_message action: heartbeat_action.to_i, id: ping_id
+ __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping
+ end
+
+ once_or_if([:suspended, :closing, :closed, :failed]) do
+ next if finished
finished = true
+ deferrable.fail Ably::Models::ErrorInfo.new(message: "Ping failed as connection has changed state to #{state}", code: 80003)
+ end
+
+ EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do
+ next if finished
+ finished = true
__incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
- time_passed = Time.now.to_f - started.to_f
- safe_yield block, time_passed if block_given?
+ error_msg = "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s"
+ logger.warn { error_msg }
+ deferrable.fail Ably::Models::ErrorInfo.new(message: error_msg, code: 50003)
+ safe_yield block, nil if block_given?
end
end
-
- once_or_if(STATE.Connected) do
- next if finished
- started = Time.now
- send_protocol_message action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i
- __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping
- end
-
- EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do
- next if finished
- finished = true
- __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
- logger.warn "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s"
- safe_yield block, nil if block_given?
- end
end
# @yield [Boolean] True if an internet connection check appears to be up following an HTTP request to a reliable CDN
# @return [EventMachine::Deferrable]
# @api private
@@ -362,11 +392,11 @@
def send_protocol_message(protocol_message)
add_message_serial_if_ack_required_to(protocol_message) do
Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message|
add_message_to_outgoing_queue message
notify_message_dispatcher_of_new_message message
- logger.debug("Connection: Prot msg queued =>: #{message.action} #{message}")
+ logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" }
end
end
end
# @api private
@@ -385,36 +415,39 @@
EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable|
# Getting auth params can be blocking so uses a Deferrable
client.auth.auth_params.tap do |auth_deferrable|
auth_deferrable.callback do |auth_params|
url_params = auth_params.merge(
- format: client.protocol,
- echo: client.echo_messages,
- v: Ably::PROTOCOL_VERSION,
- lib: client.rest_client.lib_version_id,
+ format: client.protocol,
+ echo: client.echo_messages,
+ v: Ably::PROTOCOL_VERSION,
+ lib: client.rest_client.lib_version_id,
)
+ # Use native websocket heartbeats if possible
+ url_params['heartbeats'] = 'false' unless defaults.fetch(:websocket_heartbeats_disabled)
+
url_params['clientId'] = client.auth.client_id if client.auth.has_client_id?
if connection_resumable?
url_params.merge! resume: key, connection_serial: serial
- logger.debug "Resuming connection key #{key} with serial #{serial}"
+ logger.debug { "Resuming connection key #{key} with serial #{serial}" }
elsif connection_recoverable?
- url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial]
- logger.debug "Recovering connection with key #{client.recover}"
- once(:connected, :closed, :failed) do
+ url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial]
+ logger.debug { "Recovering connection with key #{client.recover}" }
+ unsafe_once(:connected, :closed, :failed) do
client.disable_automatic_connection_recovery
end
end
url = URI(client.endpoint).tap do |endpoint|
endpoint.query = URI.encode_www_form(url_params)
end
determine_host do |host|
begin
- logger.debug "Connection: Opening socket connection to #{host}:#{port}/#{url.path}?#{url.query}"
+ logger.debug { "Connection: Opening socket connection to #{host}:#{port}/#{url.path}?#{url.query}" }
@transport = create_transport(host, port, url) do |websocket_transport|
websocket_deferrable.succeed websocket_transport
end
rescue EventMachine::ConnectionError => error
websocket_deferrable.fail error
@@ -449,11 +482,11 @@
@details = connection_details
end
# Executes registered callbacks for a successful connection resume event
# @api private
- def resumed
+ def trigger_resumed
resume_callbacks.each(&:call)
end
# Provides a simple hook to inject a callback when a connection is successfully resumed
# @api private
@@ -488,22 +521,48 @@
def connection_state_ttl=(val)
@connection_state_ttl = val
end
+ # @api private
+ def heartbeat_interval
+ # See RTN23a
+ (details && details.max_idle_interval).to_i +
+ defaults.fetch(:realtime_request_timeout)
+ end
+
+ # Resets the client serial (msgSerial) sent to Ably for each new {Ably::Models::ProtocolMessage}
+ # (see #client_serial)
+ # @api private
+ def reset_client_serial
+ @client_serial = -1
+ end
+
+ # When a hearbeat or any other message from Ably is received
+ # we know it's alive, see #RTN23
+ # @api private
+ def set_connection_confirmed_alive
+ @last_liveness_event = Time.now
+ manager.reset_liveness_timer
+ end
+
+ # @api private
+ def time_since_connection_confirmed_alive?
+ Time.now.to_i - @last_liveness_event.to_i
+ end
+
# As we are using a state machine, do not allow change_state to be used
# #transition_state_machine must be used instead
private :change_state
private
# The client serial is incremented for every message that is published that requires an ACK.
# Note that this is different to the connection serial that contains the last known serial number
# received from the server.
#
- # A client serial number therefore does not guarantee a message has been received, only sent.
- # A connection serial guarantees the server has received the message and is thus used for connection
- # recovery and resumes.
+ # A message serial number does not guarantee a message has been received, only sent.
+ # A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes.
# @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent
def client_serial
@client_serial
end