lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.8.5 vs lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.8.6

- old
+ new

@@ -58,10 +58,18 @@ ensure_state_machine_emits 'Ably::Models::ConnectionStateChange' # Expected format for a connection recover key RECOVER_REGEX = /^(?<recover>[\w-]+):(?<connection_serial>\-?\w+)$/ + # Defaults for automatic connection recovery and timeouts + DEFAULTS = { + disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically + suspended_retry_timeout: 30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically + connection_state_ttl: 60, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected + realtime_request_timeout: 10 # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage + }.freeze + # A unique public identifier for this connection, used to identify this member in presence events and messages # @return [String] attr_reader :id # A unique private connection key used to recover this connection, assigned by Ably @@ -88,27 +96,39 @@ # The Connection manager responsible for creating, maintaining and closing the connection and underlying transport # @return [Ably::Realtime::Connection::ConnectionManager] # @api private attr_reader :manager - # An internal queue used to manage unsent outgoing messages. You should never interface with this array directly + # An internal queue used to manage unsent outgoing messages. You should never interface with this array directly # @return [Array] # @api private attr_reader :__outgoing_message_queue__ - # An internal queue used to manage sent messages. You should never interface with this array directly + # An internal queue used to manage sent messages. You should never interface with this array directly # @return [Array] # @api private attr_reader :__pending_message_ack_queue__ + # Configured recovery and timeout defaults for this {Connection}. + # See the configurable options in {Ably::Realtime::Client#initialize}. + # The defaults are immutable + # @return [Hash] + attr_reader :defaults + # @api public - def initialize(client) + def initialize(client, options) @client = client @client_serial = -1 @__outgoing_message_queue__ = [] @__pending_message_ack_queue__ = [] + @defaults = DEFAULTS.dup + options.each do |key, val| + @defaults[key] = val if DEFAULTS.has_key?(key) + end if options.kind_of?(Hash) + @defaults.freeze + Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @state_machine = ConnectionStateMachine.new(self) @state = STATE(state_machine.current_state) @@ -131,28 +151,51 @@ deferrable_for_state_change_to(STATE.Closed, &success_block) end # Causes the library to attempt connection. If it was previously explicitly # closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened. + # Succeeds when connection is established i.e. state is @Connected@ + # Fails when state becomes either @Closing@, @Closed@ or @Failed@ # + # Note that if the connection remains in the disconnected ans suspended states indefinitely, + # the Deferrable or block provided may never be called + # # @yield block is called as soon as this connection is in the Connected state # # @return [EventMachine::Deferrable] # def connect(&success_block) unless connecting? || connected? raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting) transition_state_machine :connecting end - deferrable_for_state_change_to(STATE.Connected, &success_block) + + Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| + deferrable.callback do + yield if block_given? + end + succeed_callback = deferrable.method(:succeed) + fail_callback = deferrable.method(:fail) + + once(:connected) do + deferrable.succeed + off &fail_callback + end + + once(:failed, :closed, :closing) do + deferrable.fail + off &succeed_callback + end + end end # Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server. # This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently. # The elapsed milliseconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the `:connected` state. # - # @yield [Integer] if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in milliseconds + # @yield [Integer] if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in milliseconds. + # If the ping is not received within an acceptable timeframe, the block will be called with +nil+ as he first argument # # @example # client = Ably::Rest::Client.new(key: 'key.id:secret') # client.connection.ping do |ms_elapsed| # puts "Ping took #{ms_elapsed}ms" @@ -163,24 +206,36 @@ def ping(&block) raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized? raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed? started = nil + finished = false wait_for_ping = Proc.new do |protocol_message| + next if finished if protocol_message.action == Ably::Models::ProtocolMessage::ACTION.Heartbeat + finished = true __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i safe_yield block, time_passed if block_given? end end once_or_if(STATE.Connected) do + next if finished started = Time.now send_protocol_message action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping end + + EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do + next if finished + finished = true + __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) + logger.warn "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s" + safe_yield block, nil if block_given? + end end # @yield [Boolean] True if an internet connection check appears to be up following an HTTP request to a reliable CDN # @return [EventMachine::Deferrable] # @api private @@ -325,13 +380,14 @@ EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable| # Getting auth params can be blocking so uses a Deferrable client.auth.auth_params.tap do |auth_deferrable| auth_deferrable.callback do |auth_params| url_params = auth_params.merge( - timestamp: as_since_epoch(Time.now), format: client.protocol, echo: client.echo_messages ) + + url_params['clientId'] = client.auth.client_id if client.auth.has_client_id? if connection_resumable? url_params.merge! resume: key, connection_serial: serial logger.debug "Resuming connection key #{key} with serial #{serial}" elsif connection_recoverable?