lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.8.5 vs lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.8.6
- old
+ new
@@ -58,10 +58,18 @@
ensure_state_machine_emits 'Ably::Models::ConnectionStateChange'
# Expected format for a connection recover key
RECOVER_REGEX = /^(?<recover>[\w-]+):(?<connection_serial>\-?\w+)$/
+ # Defaults for automatic connection recovery and timeouts
+ DEFAULTS = {
+ disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
+ suspended_retry_timeout: 30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
+ connection_state_ttl: 60, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected
+ realtime_request_timeout: 10 # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage
+ }.freeze
+
# A unique public identifier for this connection, used to identify this member in presence events and messages
# @return [String]
attr_reader :id
# A unique private connection key used to recover this connection, assigned by Ably
@@ -88,27 +96,39 @@
# The Connection manager responsible for creating, maintaining and closing the connection and underlying transport
# @return [Ably::Realtime::Connection::ConnectionManager]
# @api private
attr_reader :manager
- # An internal queue used to manage unsent outgoing messages. You should never interface with this array directly
+ # An internal queue used to manage unsent outgoing messages. You should never interface with this array directly
# @return [Array]
# @api private
attr_reader :__outgoing_message_queue__
- # An internal queue used to manage sent messages. You should never interface with this array directly
+ # An internal queue used to manage sent messages. You should never interface with this array directly
# @return [Array]
# @api private
attr_reader :__pending_message_ack_queue__
+ # Configured recovery and timeout defaults for this {Connection}.
+ # See the configurable options in {Ably::Realtime::Client#initialize}.
+ # The defaults are immutable
+ # @return [Hash]
+ attr_reader :defaults
+
# @api public
- def initialize(client)
+ def initialize(client, options)
@client = client
@client_serial = -1
@__outgoing_message_queue__ = []
@__pending_message_ack_queue__ = []
+ @defaults = DEFAULTS.dup
+ options.each do |key, val|
+ @defaults[key] = val if DEFAULTS.has_key?(key)
+ end if options.kind_of?(Hash)
+ @defaults.freeze
+
Client::IncomingMessageDispatcher.new client, self
Client::OutgoingMessageDispatcher.new client, self
@state_machine = ConnectionStateMachine.new(self)
@state = STATE(state_machine.current_state)
@@ -131,28 +151,51 @@
deferrable_for_state_change_to(STATE.Closed, &success_block)
end
# Causes the library to attempt connection. If it was previously explicitly
# closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened.
+ # Succeeds when connection is established i.e. state is @Connected@
+ # Fails when state becomes either @Closing@, @Closed@ or @Failed@
#
+ # Note that if the connection remains in the disconnected ans suspended states indefinitely,
+ # the Deferrable or block provided may never be called
+ #
# @yield block is called as soon as this connection is in the Connected state
#
# @return [EventMachine::Deferrable]
#
def connect(&success_block)
unless connecting? || connected?
raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting)
transition_state_machine :connecting
end
- deferrable_for_state_change_to(STATE.Connected, &success_block)
+
+ Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
+ deferrable.callback do
+ yield if block_given?
+ end
+ succeed_callback = deferrable.method(:succeed)
+ fail_callback = deferrable.method(:fail)
+
+ once(:connected) do
+ deferrable.succeed
+ off &fail_callback
+ end
+
+ once(:failed, :closed, :closing) do
+ deferrable.fail
+ off &succeed_callback
+ end
+ end
end
# Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server.
# This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently.
# The elapsed milliseconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the `:connected` state.
#
- # @yield [Integer] if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in milliseconds
+ # @yield [Integer] if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in milliseconds.
+ # If the ping is not received within an acceptable timeframe, the block will be called with +nil+ as he first argument
#
# @example
# client = Ably::Rest::Client.new(key: 'key.id:secret')
# client.connection.ping do |ms_elapsed|
# puts "Ping took #{ms_elapsed}ms"
@@ -163,24 +206,36 @@
def ping(&block)
raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized?
raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed?
started = nil
+ finished = false
wait_for_ping = Proc.new do |protocol_message|
+ next if finished
if protocol_message.action == Ably::Models::ProtocolMessage::ACTION.Heartbeat
+ finished = true
__incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i
safe_yield block, time_passed if block_given?
end
end
once_or_if(STATE.Connected) do
+ next if finished
started = Time.now
send_protocol_message action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i
__incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping
end
+
+ EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do
+ next if finished
+ finished = true
+ __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
+ logger.warn "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s"
+ safe_yield block, nil if block_given?
+ end
end
# @yield [Boolean] True if an internet connection check appears to be up following an HTTP request to a reliable CDN
# @return [EventMachine::Deferrable]
# @api private
@@ -325,13 +380,14 @@
EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable|
# Getting auth params can be blocking so uses a Deferrable
client.auth.auth_params.tap do |auth_deferrable|
auth_deferrable.callback do |auth_params|
url_params = auth_params.merge(
- timestamp: as_since_epoch(Time.now),
format: client.protocol,
echo: client.echo_messages
)
+
+ url_params['clientId'] = client.auth.client_id if client.auth.has_client_id?
if connection_resumable?
url_params.merge! resume: key, connection_serial: serial
logger.debug "Resuming connection key #{key} with serial #{serial}"
elsif connection_recoverable?