lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-1.2.4 vs lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-1.2.6
- old
+ new
@@ -7,11 +7,13 @@
class Connection
include Ably::Modules::EventEmitter
include Ably::Modules::Conversions
include Ably::Modules::SafeYield
extend Ably::Modules::Enum
+ using Ably::Util::AblyExtensions
+
# The current {Ably::Realtime::Connection::STATE} of the connection.
# Describes the realtime [Connection]{@link Connection} object states.
#
# @spec RTN4d
#
@@ -21,30 +23,30 @@
# CONNECTED A connection exists and is active.
# DISCONNECTED A temporary failure condition. No current connection exists because there is no network connectivity
# or no host is available. The disconnected state is entered if an established connection is dropped,
# or if a connection attempt was unsuccessful. In the disconnected state the library will periodically
# attempt to open a new connection (approximately every 15 seconds), anticipating that the connection
- # will be re-established soon and thus connection and channel continuity will be possible.
+ # will be re-established soon and thus connection and channel continuity will be possible.
# In this state, developers can continue to publish messages as they are automatically placed
- # in a local queue, to be sent as soon as a connection is reestablished. Messages published by
+ # in a local queue, to be sent as soon as a connection is reestablished. Messages published by
# other clients while this client is disconnected will be delivered to it upon reconnection,
- # so long as the connection was resumed within 2 minutes. After 2 minutes have elapsed, recovery
+ # so long as the connection was resumed within 2 minutes. After 2 minutes have elapsed, recovery
# is no longer possible and the connection will move to the SUSPENDED state.
- # SUSPENDED A long term failure condition. No current connection exists because there is no network connectivity
- # or no host is available. The suspended state is entered after a failed connection attempt if
- # there has then been no connection for a period of two minutes. In the suspended state, the library
- # will periodically attempt to open a new connection every 30 seconds. Developers are unable to
+ # SUSPENDED A long term failure condition. No current connection exists because there is no network connectivity
+ # or no host is available. The suspended state is entered after a failed connection attempt if
+ # there has then been no connection for a period of two minutes. In the suspended state, the library
+ # will periodically attempt to open a new connection every 30 seconds. Developers are unable to
# publish messages in this state. A new connection attempt can also be triggered by an explicit
- # call to {Ably::Realtime::Connection#connect}. Once the connection has been re-established,
- # channels will be automatically re-attached. The client has been disconnected for too long for them
- # to resume from where they left off, so if it wants to catch up on messages published by other clients
+ # call to {Ably::Realtime::Connection#connect}. Once the connection has been re-established,
+ # channels will be automatically re-attached. The client has been disconnected for too long for them
+ # to resume from where they left off, so if it wants to catch up on messages published by other clients
# while it was disconnected, it needs to use the History API.
- # CLOSING An explicit request by the developer to close the connection has been sent to the Ably service.
- # If a reply is not received from Ably within a short period of time, the connection is forcibly
+ # CLOSING An explicit request by the developer to close the connection has been sent to the Ably service.
+ # If a reply is not received from Ably within a short period of time, the connection is forcibly
# terminated and the connection state becomes CLOSED.
- # CLOSED The connection has been explicitly closed by the client. In the closed state, no reconnection attempts
- # are made automatically by the library, and clients may not publish messages. No connection state is
+ # CLOSED The connection has been explicitly closed by the client. In the closed state, no reconnection attempts
+ # are made automatically by the library, and clients may not publish messages. No connection state is
# preserved by the service or by the library. A new connection attempt can be triggered by an explicit
# call to {Ably::Realtime::Connection#connect}, which results in a new connection.
# FAILED This state is entered if the client library encounters a failure condition that it cannot recover from.
# This may be a fatal connection error received from the Ably service, for example an attempt to connect
# with an incorrect API key, or a local terminal error, for example the token in use has expired
@@ -75,13 +77,10 @@
include Ably::Modules::StateEmitter
include Ably::Modules::UsesStateMachine
ensure_state_machine_emits 'Ably::Models::ConnectionStateChange'
- # Expected format for a connection recover key
- RECOVER_REGEX = /^(?<recover>[^:]+):(?<connection_serial>[^:]+):(?<msg_serial>\-?\d+)$/
-
# Defaults for automatic connection recovery and timeouts
DEFAULTS = {
channel_retry_timeout: 15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
suspended_retry_timeout: 30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
@@ -111,20 +110,10 @@
#
# @return [String]
#
attr_reader :key
- # The serial number of the last message to be received on this connection, used automatically by the library when
- # recovering or resuming a connection. When recovering a connection explicitly, the recoveryKey is used in
- # the recover client options as it contains both the key and the last message serial.
- #
- # @spec RTN10
- #
- # @return [Integer]
- #
- attr_reader :serial
-
# An {Ably::Models::ErrorInfo} object describing the last error received if a connection failure occurs.
#
# @spec RTN14a
#
# @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException]
@@ -175,29 +164,20 @@
options.each do |key, val|
@defaults[key] = val if DEFAULTS.has_key?(key)
end if options.kind_of?(Hash)
@defaults.freeze
- # If a recover client options is provided, then we need to ensure that the msgSerial matches the
- # recover serial immediately at client library instantiation. This is done immediately so that any queued
- # publishes use the correct serial number for these queued messages as well.
- # There is no harm if the msgSerial is higher than expected if the recover fails.
- recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i
- if recovery_msg_serial
- @client_msg_serial = recovery_msg_serial
- else
- reset_client_msg_serial
- end
-
Client::IncomingMessageDispatcher.new client, self
Client::OutgoingMessageDispatcher.new client, self
@state_machine = ConnectionStateMachine.new(self)
@state = STATE(state_machine.current_state)
@manager = ConnectionManager.new(self)
@current_host = client.endpoint.host
+
+ reset_client_msg_serial
end
# Causes the connection to close, entering the {Ably::Realtime::Connection::STATE} CLOSING state.
# Once closed, the library does not attempt to re-establish the connection without an explicit call to
# {Ably::Realtime::Connection#connect}.
@@ -325,11 +305,11 @@
# @return [EventMachine::Deferrable]
# @api private
def internet_up?
url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}"
EventMachine::DefaultDeferrable.new.tap do |deferrable|
- EventMachine::HttpRequest.new(url, tls: { verify_peer: true }).get.tap do |http|
+ EventMachine::AblyHttpRequest::HttpRequest.new(url, tls: { verify_peer: true }).get.tap do |http|
http.errback do
yield false if block_given?
deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unable to connect to #{url}", nil, Ably::Exceptions::Codes::CONNECTION_FAILED)
end
http.callback do
@@ -345,45 +325,52 @@
end
end
end
end
- # The recovery key string can be used by another client to recover this connection's state in the recover client options property. See connection state recover options for more information.
+ # The recovery key string can be used by another client to recover this connection's state in the
+ # recover client options property. See connection state recover options for more information.
#
# @spec RTN16b, RTN16c
#
- # @return [String]
+ # @deprecated Use {#create_recovery_key} instead
#
def recovery_key
- "#{key}:#{serial}:#{client_msg_serial}" if connection_resumable?
+ logger.warn "[DEPRECATION] recovery_key is deprecated, use create_recovery_key method instead"
+ create_recovery_key
end
+ # The recovery key string can be used by another client to recover this connection's state in the recover client
+ # options property. See connection state recover options for more information.
+ #
+ # @spec RTN16g, RTN16c
+ #
+ # @return [String] a json string which incorporates the @connectionKey@, the current @msgSerial@ and collection
+ # of pairs of channel @name@ and current @channelSerial@ for every currently attached channel
+ def create_recovery_key
+ if key.nil_or_empty? || state == :closing || state == :closed || state == :failed || state == :suspended
+ return nil #RTN16g2
+ end
+ RecoveryKeyContext.new(key, client_msg_serial, client.channels.get_channel_serials).to_json
+ end
+
# Following a new connection being made, the connection ID, connection key
- # and connection serial need to match the details provided by the server.
+ # need to match the details provided by the server.
#
# @return [void]
# @api private
- def configure_new(connection_id, connection_key, connection_serial)
+ def configure_new(connection_id, connection_key)
@id = connection_id
@key = connection_key
-
- update_connection_serial connection_serial
end
- # Store last received connection serial so that the connection can be resumed from the last known point-in-time
- # @return [void]
- # @api private
- def update_connection_serial(connection_serial)
- @serial = connection_serial
- end
-
# Disable automatic resume of a connection
# @return [void]
# @api private
def reset_resume_info
@key = nil
- @serial = nil
+ @id = nil
end
# @!attribute [r] __outgoing_protocol_msgbus__
# @return [Ably::Util::PubSub] Client library internal outgoing protocol message bus
# @api private
@@ -442,21 +429,32 @@
# @param [Ably::Models::ProtocolMessage] protocol_message
# @return [void]
# @api private
def send_protocol_message(protocol_message)
add_message_serial_if_ack_required_to(protocol_message) do
- Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message|
- add_message_to_outgoing_queue message
- notify_message_dispatcher_of_new_message message
- logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" }
- end
+ message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger)
+ add_message_to_outgoing_queue(message)
+ notify_message_dispatcher_of_new_message message
end
end
+ def send_protocol_message_immediately(protocol_message)
+ message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger)
+ add_message_to_outgoing_queue(message, true)
+ notify_message_dispatcher_of_new_message message
+ end
+
# @api private
- def add_message_to_outgoing_queue(protocol_message)
- __outgoing_message_queue__ << protocol_message
+ def add_message_to_outgoing_queue(protocol_message, send_immediately = false)
+ if send_immediately
+ # Adding msg at the top of the queue to get processed immediately while connection is CONNECTED
+ __outgoing_message_queue__.prepend(protocol_message)
+ logger.debug { "Connection: protocol msg pushed at the top =>: #{protocol_message.action} #{protocol_message}" }
+ else
+ __outgoing_message_queue__ << protocol_message
+ logger.debug { "Connection: protocol msg queued =>: #{protocol_message.action} #{protocol_message}" }
+ end
end
# @api private
def notify_message_dispatcher_of_new_message(protocol_message)
__outgoing_protocol_msgbus__.publish :protocol_message, protocol_message
@@ -470,32 +468,33 @@
client.auth.auth_params.tap do |auth_deferrable|
auth_deferrable.callback do |auth_params|
url_params = auth_params.merge(
'format' => client.protocol,
'echo' => client.echo_messages,
- 'v' => Ably::PROTOCOL_VERSION,
+ 'v' => Ably::PROTOCOL_VERSION, # RSC7a
'agent' => client.rest_client.agent
)
# Use native websocket heartbeats if possible, but allow Ably protocol heartbeats
url_params['heartbeats'] = if defaults.fetch(:websocket_heartbeats_disabled)
'true'
else
'false'
end
-
- url_params['clientId'] = client.auth.client_id if client.auth.has_client_id?
+ # RSA7e1
+ url_params['clientId'] = client.auth.client_id_for_request_sync if client.auth.client_id_for_request_sync
url_params.merge!(client.transport_params)
- if connection_resumable?
- url_params.merge! resume: key, connection_serial: serial
- logger.debug { "Resuming connection key #{key} with serial #{serial}" }
- elsif connection_recoverable?
- url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial]
- logger.debug { "Recovering connection with key #{client.recover}" }
- unsafe_once(:connected, :closed, :failed) do
- client.disable_automatic_connection_recovery
+ if !key.nil_or_empty? and connection_state_available?
+ url_params.merge! resume: key
+ logger.debug { "Resuming connection with key #{key}" }
+ elsif !client.recover.nil_or_empty?
+ recovery_context = RecoveryKeyContext.from_json(client.recover, logger)
+ unless recovery_context.nil?
+ key = recovery_context.connection_key
+ logger.debug { "Recovering connection with key #{key}" }
+ url_params.merge! recover: key
end
end
url = URI(client.endpoint).tap do |endpoint|
endpoint.query = URI.encode_www_form(url_params)
@@ -542,28 +541,10 @@
# @api private
def set_connection_details(connection_details)
@details = connection_details
end
- # Executes registered callbacks for a successful connection resume event
- # @api private
- def trigger_resumed
- resume_callbacks.each(&:call)
- end
-
- # Provides a simple hook to inject a callback when a connection is successfully resumed
- # @api private
- def on_resume(&callback)
- resume_callbacks << callback
- end
-
- # Remove a registered connection resume callback
- # @api private
- def off_resume(&callback)
- resume_callbacks.delete(callback)
- end
-
# Returns false if messages cannot be published as a result of message queueing being disabled
# @api private
def can_publish_messages?
connected? ||
( (initialized? || connecting? || disconnected?) && client.queue_messages )
@@ -598,10 +579,16 @@
# @api private
def reset_client_msg_serial
@client_msg_serial = -1
end
+ # Sets the client message serial from recover clientOption.
+ # @api private
+ def set_msg_serial_from_recover=(value)
+ @client_msg_serial = value
+ end
+
# When a hearbeat or any other message from Ably is received
# we know it's alive, see #RTN23
# @api private
def set_connection_confirmed_alive
@last_liveness_event = Time.now
@@ -618,24 +605,16 @@
private :change_state
private
# The client message serial (msgSerial) is incremented for every message that is published that requires an ACK.
- # Note that this is different to the connection serial that contains the last known serial number
- # received from the server.
- #
# A message serial number does not guarantee a message has been received, only sent.
- # A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes.
# @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent
def client_msg_serial
@client_msg_serial
end
- def resume_callbacks
- @resume_callbacks ||= []
- end
-
def create_pub_sub_message_bus
Ably::Util::PubSub.new(
coerce_into: lambda do |event|
raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message
:protocol_message
@@ -663,14 +642,10 @@
# Simply wait until the next EventMachine tick to ensure Connection initialization is complete
def when_initialized
EventMachine.next_tick { yield }
end
- def connection_resumable?
- !key.nil? && !serial.nil? && connection_state_available?
- end
-
def connection_state_available?
return true if connected?
return false if time_since_connection_confirmed_alive? > connection_state_ttl + details.max_idle_interval
@@ -680,18 +655,10 @@
else
true
end
end
- def connection_recoverable?
- connection_recover_parts
- end
-
- def connection_recover_parts
- client.recover.to_s.match(RECOVER_REGEX)
- end
-
def production?
client.environment.nil? || client.environment == :production
end
def custom_port?
@@ -738,5 +705,6 @@
end
require 'ably/realtime/connection/connection_manager'
require 'ably/realtime/connection/connection_state_machine'
require 'ably/realtime/connection/websocket_transport'
+require 'ably/realtime/recovery_key_context'