lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-1.2.4 vs lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-1.2.6

- old
+ new

@@ -7,11 +7,13 @@ class Connection include Ably::Modules::EventEmitter include Ably::Modules::Conversions include Ably::Modules::SafeYield extend Ably::Modules::Enum + using Ably::Util::AblyExtensions + # The current {Ably::Realtime::Connection::STATE} of the connection. # Describes the realtime [Connection]{@link Connection} object states. # # @spec RTN4d # @@ -21,30 +23,30 @@ # CONNECTED A connection exists and is active. # DISCONNECTED A temporary failure condition. No current connection exists because there is no network connectivity # or no host is available. The disconnected state is entered if an established connection is dropped, # or if a connection attempt was unsuccessful. In the disconnected state the library will periodically # attempt to open a new connection (approximately every 15 seconds), anticipating that the connection - # will be re-established soon and thus connection and channel continuity will be possible. + # will be re-established soon and thus connection and channel continuity will be possible. # In this state, developers can continue to publish messages as they are automatically placed - # in a local queue, to be sent as soon as a connection is reestablished. Messages published by + # in a local queue, to be sent as soon as a connection is reestablished. Messages published by # other clients while this client is disconnected will be delivered to it upon reconnection, - # so long as the connection was resumed within 2 minutes. After 2 minutes have elapsed, recovery + # so long as the connection was resumed within 2 minutes. After 2 minutes have elapsed, recovery # is no longer possible and the connection will move to the SUSPENDED state. - # SUSPENDED A long term failure condition. No current connection exists because there is no network connectivity - # or no host is available. The suspended state is entered after a failed connection attempt if - # there has then been no connection for a period of two minutes. In the suspended state, the library - # will periodically attempt to open a new connection every 30 seconds. Developers are unable to + # SUSPENDED A long term failure condition. No current connection exists because there is no network connectivity + # or no host is available. The suspended state is entered after a failed connection attempt if + # there has then been no connection for a period of two minutes. In the suspended state, the library + # will periodically attempt to open a new connection every 30 seconds. Developers are unable to # publish messages in this state. A new connection attempt can also be triggered by an explicit - # call to {Ably::Realtime::Connection#connect}. Once the connection has been re-established, - # channels will be automatically re-attached. The client has been disconnected for too long for them - # to resume from where they left off, so if it wants to catch up on messages published by other clients + # call to {Ably::Realtime::Connection#connect}. Once the connection has been re-established, + # channels will be automatically re-attached. The client has been disconnected for too long for them + # to resume from where they left off, so if it wants to catch up on messages published by other clients # while it was disconnected, it needs to use the History API. - # CLOSING An explicit request by the developer to close the connection has been sent to the Ably service. - # If a reply is not received from Ably within a short period of time, the connection is forcibly + # CLOSING An explicit request by the developer to close the connection has been sent to the Ably service. + # If a reply is not received from Ably within a short period of time, the connection is forcibly # terminated and the connection state becomes CLOSED. - # CLOSED The connection has been explicitly closed by the client. In the closed state, no reconnection attempts - # are made automatically by the library, and clients may not publish messages. No connection state is + # CLOSED The connection has been explicitly closed by the client. In the closed state, no reconnection attempts + # are made automatically by the library, and clients may not publish messages. No connection state is # preserved by the service or by the library. A new connection attempt can be triggered by an explicit # call to {Ably::Realtime::Connection#connect}, which results in a new connection. # FAILED This state is entered if the client library encounters a failure condition that it cannot recover from. # This may be a fatal connection error received from the Ably service, for example an attempt to connect # with an incorrect API key, or a local terminal error, for example the token in use has expired @@ -75,13 +77,10 @@ include Ably::Modules::StateEmitter include Ably::Modules::UsesStateMachine ensure_state_machine_emits 'Ably::Models::ConnectionStateChange' - # Expected format for a connection recover key - RECOVER_REGEX = /^(?<recover>[^:]+):(?<connection_serial>[^:]+):(?<msg_serial>\-?\d+)$/ - # Defaults for automatic connection recovery and timeouts DEFAULTS = { channel_retry_timeout: 15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically suspended_retry_timeout: 30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically @@ -111,20 +110,10 @@ # # @return [String] # attr_reader :key - # The serial number of the last message to be received on this connection, used automatically by the library when - # recovering or resuming a connection. When recovering a connection explicitly, the recoveryKey is used in - # the recover client options as it contains both the key and the last message serial. - # - # @spec RTN10 - # - # @return [Integer] - # - attr_reader :serial - # An {Ably::Models::ErrorInfo} object describing the last error received if a connection failure occurs. # # @spec RTN14a # # @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException] @@ -175,29 +164,20 @@ options.each do |key, val| @defaults[key] = val if DEFAULTS.has_key?(key) end if options.kind_of?(Hash) @defaults.freeze - # If a recover client options is provided, then we need to ensure that the msgSerial matches the - # recover serial immediately at client library instantiation. This is done immediately so that any queued - # publishes use the correct serial number for these queued messages as well. - # There is no harm if the msgSerial is higher than expected if the recover fails. - recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i - if recovery_msg_serial - @client_msg_serial = recovery_msg_serial - else - reset_client_msg_serial - end - Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @state_machine = ConnectionStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ConnectionManager.new(self) @current_host = client.endpoint.host + + reset_client_msg_serial end # Causes the connection to close, entering the {Ably::Realtime::Connection::STATE} CLOSING state. # Once closed, the library does not attempt to re-establish the connection without an explicit call to # {Ably::Realtime::Connection#connect}. @@ -325,11 +305,11 @@ # @return [EventMachine::Deferrable] # @api private def internet_up? url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}" EventMachine::DefaultDeferrable.new.tap do |deferrable| - EventMachine::HttpRequest.new(url, tls: { verify_peer: true }).get.tap do |http| + EventMachine::AblyHttpRequest::HttpRequest.new(url, tls: { verify_peer: true }).get.tap do |http| http.errback do yield false if block_given? deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unable to connect to #{url}", nil, Ably::Exceptions::Codes::CONNECTION_FAILED) end http.callback do @@ -345,45 +325,52 @@ end end end end - # The recovery key string can be used by another client to recover this connection's state in the recover client options property. See connection state recover options for more information. + # The recovery key string can be used by another client to recover this connection's state in the + # recover client options property. See connection state recover options for more information. # # @spec RTN16b, RTN16c # - # @return [String] + # @deprecated Use {#create_recovery_key} instead # def recovery_key - "#{key}:#{serial}:#{client_msg_serial}" if connection_resumable? + logger.warn "[DEPRECATION] recovery_key is deprecated, use create_recovery_key method instead" + create_recovery_key end + # The recovery key string can be used by another client to recover this connection's state in the recover client + # options property. See connection state recover options for more information. + # + # @spec RTN16g, RTN16c + # + # @return [String] a json string which incorporates the @connectionKey@, the current @msgSerial@ and collection + # of pairs of channel @name@ and current @channelSerial@ for every currently attached channel + def create_recovery_key + if key.nil_or_empty? || state == :closing || state == :closed || state == :failed || state == :suspended + return nil #RTN16g2 + end + RecoveryKeyContext.new(key, client_msg_serial, client.channels.get_channel_serials).to_json + end + # Following a new connection being made, the connection ID, connection key - # and connection serial need to match the details provided by the server. + # need to match the details provided by the server. # # @return [void] # @api private - def configure_new(connection_id, connection_key, connection_serial) + def configure_new(connection_id, connection_key) @id = connection_id @key = connection_key - - update_connection_serial connection_serial end - # Store last received connection serial so that the connection can be resumed from the last known point-in-time - # @return [void] - # @api private - def update_connection_serial(connection_serial) - @serial = connection_serial - end - # Disable automatic resume of a connection # @return [void] # @api private def reset_resume_info @key = nil - @serial = nil + @id = nil end # @!attribute [r] __outgoing_protocol_msgbus__ # @return [Ably::Util::PubSub] Client library internal outgoing protocol message bus # @api private @@ -442,21 +429,32 @@ # @param [Ably::Models::ProtocolMessage] protocol_message # @return [void] # @api private def send_protocol_message(protocol_message) add_message_serial_if_ack_required_to(protocol_message) do - Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message| - add_message_to_outgoing_queue message - notify_message_dispatcher_of_new_message message - logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" } - end + message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger) + add_message_to_outgoing_queue(message) + notify_message_dispatcher_of_new_message message end end + def send_protocol_message_immediately(protocol_message) + message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger) + add_message_to_outgoing_queue(message, true) + notify_message_dispatcher_of_new_message message + end + # @api private - def add_message_to_outgoing_queue(protocol_message) - __outgoing_message_queue__ << protocol_message + def add_message_to_outgoing_queue(protocol_message, send_immediately = false) + if send_immediately + # Adding msg at the top of the queue to get processed immediately while connection is CONNECTED + __outgoing_message_queue__.prepend(protocol_message) + logger.debug { "Connection: protocol msg pushed at the top =>: #{protocol_message.action} #{protocol_message}" } + else + __outgoing_message_queue__ << protocol_message + logger.debug { "Connection: protocol msg queued =>: #{protocol_message.action} #{protocol_message}" } + end end # @api private def notify_message_dispatcher_of_new_message(protocol_message) __outgoing_protocol_msgbus__.publish :protocol_message, protocol_message @@ -470,32 +468,33 @@ client.auth.auth_params.tap do |auth_deferrable| auth_deferrable.callback do |auth_params| url_params = auth_params.merge( 'format' => client.protocol, 'echo' => client.echo_messages, - 'v' => Ably::PROTOCOL_VERSION, + 'v' => Ably::PROTOCOL_VERSION, # RSC7a 'agent' => client.rest_client.agent ) # Use native websocket heartbeats if possible, but allow Ably protocol heartbeats url_params['heartbeats'] = if defaults.fetch(:websocket_heartbeats_disabled) 'true' else 'false' end - - url_params['clientId'] = client.auth.client_id if client.auth.has_client_id? + # RSA7e1 + url_params['clientId'] = client.auth.client_id_for_request_sync if client.auth.client_id_for_request_sync url_params.merge!(client.transport_params) - if connection_resumable? - url_params.merge! resume: key, connection_serial: serial - logger.debug { "Resuming connection key #{key} with serial #{serial}" } - elsif connection_recoverable? - url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial] - logger.debug { "Recovering connection with key #{client.recover}" } - unsafe_once(:connected, :closed, :failed) do - client.disable_automatic_connection_recovery + if !key.nil_or_empty? and connection_state_available? + url_params.merge! resume: key + logger.debug { "Resuming connection with key #{key}" } + elsif !client.recover.nil_or_empty? + recovery_context = RecoveryKeyContext.from_json(client.recover, logger) + unless recovery_context.nil? + key = recovery_context.connection_key + logger.debug { "Recovering connection with key #{key}" } + url_params.merge! recover: key end end url = URI(client.endpoint).tap do |endpoint| endpoint.query = URI.encode_www_form(url_params) @@ -542,28 +541,10 @@ # @api private def set_connection_details(connection_details) @details = connection_details end - # Executes registered callbacks for a successful connection resume event - # @api private - def trigger_resumed - resume_callbacks.each(&:call) - end - - # Provides a simple hook to inject a callback when a connection is successfully resumed - # @api private - def on_resume(&callback) - resume_callbacks << callback - end - - # Remove a registered connection resume callback - # @api private - def off_resume(&callback) - resume_callbacks.delete(callback) - end - # Returns false if messages cannot be published as a result of message queueing being disabled # @api private def can_publish_messages? connected? || ( (initialized? || connecting? || disconnected?) && client.queue_messages ) @@ -598,10 +579,16 @@ # @api private def reset_client_msg_serial @client_msg_serial = -1 end + # Sets the client message serial from recover clientOption. + # @api private + def set_msg_serial_from_recover=(value) + @client_msg_serial = value + end + # When a hearbeat or any other message from Ably is received # we know it's alive, see #RTN23 # @api private def set_connection_confirmed_alive @last_liveness_event = Time.now @@ -618,24 +605,16 @@ private :change_state private # The client message serial (msgSerial) is incremented for every message that is published that requires an ACK. - # Note that this is different to the connection serial that contains the last known serial number - # received from the server. - # # A message serial number does not guarantee a message has been received, only sent. - # A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes. # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent def client_msg_serial @client_msg_serial end - def resume_callbacks - @resume_callbacks ||= [] - end - def create_pub_sub_message_bus Ably::Util::PubSub.new( coerce_into: lambda do |event| raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message :protocol_message @@ -663,14 +642,10 @@ # Simply wait until the next EventMachine tick to ensure Connection initialization is complete def when_initialized EventMachine.next_tick { yield } end - def connection_resumable? - !key.nil? && !serial.nil? && connection_state_available? - end - def connection_state_available? return true if connected? return false if time_since_connection_confirmed_alive? > connection_state_ttl + details.max_idle_interval @@ -680,18 +655,10 @@ else true end end - def connection_recoverable? - connection_recover_parts - end - - def connection_recover_parts - client.recover.to_s.match(RECOVER_REGEX) - end - def production? client.environment.nil? || client.environment == :production end def custom_port? @@ -738,5 +705,6 @@ end require 'ably/realtime/connection/connection_manager' require 'ably/realtime/connection/connection_state_machine' require 'ably/realtime/connection/websocket_transport' +require 'ably/realtime/recovery_key_context'