lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.9.3 vs lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-1.0.0

- old
+ new

@@ -1,5 +1,7 @@ +require 'securerandom' + module Ably module Realtime # The Connection class represents the connection associated with an Ably Realtime instance. # The Connection object exposes the lifecycle and parameters of the realtime connection. # @@ -23,12 +25,10 @@ # Connection::STATE.Suspended # Connection::STATE.Closing # Connection::STATE.Closed # Connection::STATE.Failed # - # Connection emit errors - use `on(:error)` to subscribe to errors - # # @example # client = Ably::Realtime::Client.new('key.id:secret') # client.connection.on(:connected) do # puts "Connected with connection ID: #{client.connection.id}" # end @@ -40,35 +40,45 @@ include Ably::Modules::EventEmitter include Ably::Modules::Conversions include Ably::Modules::SafeYield extend Ably::Modules::Enum - # Valid Connection states + # ConnectionState + # The permited states for this connection STATE = ruby_enum('STATE', :initialized, :connecting, :connected, :disconnected, :suspended, :closing, :closed, :failed ) + + # ConnectionEvent + # The permitted connection events that are emitted for this connection + EVENT = ruby_enum('EVENT', + STATE.to_sym_arr + [:update] + ) + include Ably::Modules::StateEmitter include Ably::Modules::UsesStateMachine ensure_state_machine_emits 'Ably::Models::ConnectionStateChange' # Expected format for a connection recover key RECOVER_REGEX = /^(?<recover>[\w!-]+):(?<connection_serial>\-?\w+)$/ # Defaults for automatic connection recovery and timeouts DEFAULTS = { + channel_retry_timeout: 15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically suspended_retry_timeout: 30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically connection_state_ttl: 120, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected - max_connection_state_ttl: nil, # allow a max TTL to be passed in for CI test purposes thus overiding any connection_state_ttl sent from Ably + max_connection_state_ttl: nil, # allow a max TTL to be passed in, usually for CI test purposes thus overiding any connection_state_ttl sent from Ably realtime_request_timeout: 10, # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage + websocket_heartbeats_disabled: false, }.freeze # A unique public identifier for this connection, used to identify this member in presence events and messages # @return [String] attr_reader :id @@ -120,13 +130,13 @@ attr_reader :defaults # @api public def initialize(client, options) @client = client - @client_serial = -1 @__outgoing_message_queue__ = [] @__pending_message_ack_queue__ = [] + reset_client_serial @defaults = DEFAULTS.dup options.each do |key, val| @defaults[key] = val if DEFAULTS.has_key?(key) end if options.kind_of?(Hash) @@ -148,11 +158,13 @@ # # @return [EventMachine::Deferrable] # def close(&success_block) unless closing? || closed? - raise exception_for_state_change_to(:closing) unless can_transition_to?(:closing) + unless can_transition_to?(:closing) + return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:closing)) + end transition_state_machine :closing end deferrable_for_state_change_to(STATE.Closed, &success_block) end @@ -168,27 +180,30 @@ # # @return [EventMachine::Deferrable] # def connect(&success_block) unless connecting? || connected? - raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting) - transition_state_machine :connecting + unless can_transition_to?(:connecting) + return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:connecting)) + end + # If connect called in a suspended block, we want to ensure the other callbacks have finished their work first + EventMachine.next_tick { transition_state_machine :connecting if can_transition_to?(:connecting) } end Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| deferrable.callback do yield if block_given? end succeed_callback = deferrable.method(:succeed) fail_callback = deferrable.method(:fail) - once(:connected) do + unsafe_once(:connected) do deferrable.succeed off(&fail_callback) end - once(:failed, :closed, :closing) do + unsafe_once(:failed, :closed, :closing) do deferrable.fail off(&succeed_callback) end end end @@ -204,43 +219,58 @@ # client = Ably::Rest::Client.new(key: 'key.id:secret') # client.connection.ping do |elapsed_s| # puts "Ping took #{elapsed_s}s" # end # - # @return [void] + # @return [Ably::Util::SafeDeferrable] # def ping(&block) - raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized? - raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed? + if initialized? || suspended? || closing? || closed? || failed? + error = Ably::Models::ErrorInfo.new(message: "Cannot send a ping when the connection is #{state}", code: 80003) + return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) + end - started = nil - finished = false + Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| + started = nil + finished = false + ping_id = SecureRandom.hex(16) + heartbeat_action = Ably::Models::ProtocolMessage::ACTION.Heartbeat - wait_for_ping = Proc.new do |protocol_message| - next if finished - if protocol_message.action == Ably::Models::ProtocolMessage::ACTION.Heartbeat + wait_for_ping = Proc.new do |protocol_message| + next if finished + if protocol_message.action == heartbeat_action && protocol_message.id == ping_id + finished = true + __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) + time_passed = Time.now.to_f - started.to_f + deferrable.succeed time_passed + safe_yield block, time_passed if block_given? + end + end + + once_or_if(STATE.Connected) do + next if finished + started = Time.now + send_protocol_message action: heartbeat_action.to_i, id: ping_id + __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping + end + + once_or_if([:suspended, :closing, :closed, :failed]) do + next if finished finished = true + deferrable.fail Ably::Models::ErrorInfo.new(message: "Ping failed as connection has changed state to #{state}", code: 80003) + end + + EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do + next if finished + finished = true __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) - time_passed = Time.now.to_f - started.to_f - safe_yield block, time_passed if block_given? + error_msg = "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s" + logger.warn { error_msg } + deferrable.fail Ably::Models::ErrorInfo.new(message: error_msg, code: 50003) + safe_yield block, nil if block_given? end end - - once_or_if(STATE.Connected) do - next if finished - started = Time.now - send_protocol_message action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i - __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping - end - - EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do - next if finished - finished = true - __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) - logger.warn "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s" - safe_yield block, nil if block_given? - end end # @yield [Boolean] True if an internet connection check appears to be up following an HTTP request to a reliable CDN # @return [EventMachine::Deferrable] # @api private @@ -362,11 +392,11 @@ def send_protocol_message(protocol_message) add_message_serial_if_ack_required_to(protocol_message) do Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message| add_message_to_outgoing_queue message notify_message_dispatcher_of_new_message message - logger.debug("Connection: Prot msg queued =>: #{message.action} #{message}") + logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" } end end end # @api private @@ -385,36 +415,39 @@ EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable| # Getting auth params can be blocking so uses a Deferrable client.auth.auth_params.tap do |auth_deferrable| auth_deferrable.callback do |auth_params| url_params = auth_params.merge( - format: client.protocol, - echo: client.echo_messages, - v: Ably::PROTOCOL_VERSION, - lib: client.rest_client.lib_version_id, + format: client.protocol, + echo: client.echo_messages, + v: Ably::PROTOCOL_VERSION, + lib: client.rest_client.lib_version_id, ) + # Use native websocket heartbeats if possible + url_params['heartbeats'] = 'false' unless defaults.fetch(:websocket_heartbeats_disabled) + url_params['clientId'] = client.auth.client_id if client.auth.has_client_id? if connection_resumable? url_params.merge! resume: key, connection_serial: serial - logger.debug "Resuming connection key #{key} with serial #{serial}" + logger.debug { "Resuming connection key #{key} with serial #{serial}" } elsif connection_recoverable? - url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial] - logger.debug "Recovering connection with key #{client.recover}" - once(:connected, :closed, :failed) do + url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial] + logger.debug { "Recovering connection with key #{client.recover}" } + unsafe_once(:connected, :closed, :failed) do client.disable_automatic_connection_recovery end end url = URI(client.endpoint).tap do |endpoint| endpoint.query = URI.encode_www_form(url_params) end determine_host do |host| begin - logger.debug "Connection: Opening socket connection to #{host}:#{port}/#{url.path}?#{url.query}" + logger.debug { "Connection: Opening socket connection to #{host}:#{port}/#{url.path}?#{url.query}" } @transport = create_transport(host, port, url) do |websocket_transport| websocket_deferrable.succeed websocket_transport end rescue EventMachine::ConnectionError => error websocket_deferrable.fail error @@ -449,11 +482,11 @@ @details = connection_details end # Executes registered callbacks for a successful connection resume event # @api private - def resumed + def trigger_resumed resume_callbacks.each(&:call) end # Provides a simple hook to inject a callback when a connection is successfully resumed # @api private @@ -488,22 +521,48 @@ def connection_state_ttl=(val) @connection_state_ttl = val end + # @api private + def heartbeat_interval + # See RTN23a + (details && details.max_idle_interval).to_i + + defaults.fetch(:realtime_request_timeout) + end + + # Resets the client serial (msgSerial) sent to Ably for each new {Ably::Models::ProtocolMessage} + # (see #client_serial) + # @api private + def reset_client_serial + @client_serial = -1 + end + + # When a hearbeat or any other message from Ably is received + # we know it's alive, see #RTN23 + # @api private + def set_connection_confirmed_alive + @last_liveness_event = Time.now + manager.reset_liveness_timer + end + + # @api private + def time_since_connection_confirmed_alive? + Time.now.to_i - @last_liveness_event.to_i + end + # As we are using a state machine, do not allow change_state to be used # #transition_state_machine must be used instead private :change_state private # The client serial is incremented for every message that is published that requires an ACK. # Note that this is different to the connection serial that contains the last known serial number # received from the server. # - # A client serial number therefore does not guarantee a message has been received, only sent. - # A connection serial guarantees the server has received the message and is thus used for connection - # recovery and resumes. + # A message serial number does not guarantee a message has been received, only sent. + # A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes. # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent def client_serial @client_serial end