lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-1.1.0 vs lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-1.1.2.rc1
- old
+ new
@@ -64,11 +64,11 @@
include Ably::Modules::StateEmitter
include Ably::Modules::UsesStateMachine
ensure_state_machine_emits 'Ably::Models::ConnectionStateChange'
# Expected format for a connection recover key
- RECOVER_REGEX = /^(?<recover>[\w!-]+):(?<connection_serial>\-?\w+)$/
+ RECOVER_REGEX = /^(?<recover>[^:]+):(?<connection_serial>[^:]+):(?<msg_serial>\-?\d+)$/
# Defaults for automatic connection recovery and timeouts
DEFAULTS = {
channel_retry_timeout: 15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
@@ -135,24 +135,36 @@
# @api public
def initialize(client, options)
@client = client
@__outgoing_message_queue__ = []
@__pending_message_ack_queue__ = []
- reset_client_serial
@defaults = DEFAULTS.dup
options.each do |key, val|
@defaults[key] = val if DEFAULTS.has_key?(key)
end if options.kind_of?(Hash)
@defaults.freeze
+ # If a recover client options is provided, then we need to ensure that the msgSerial matches the
+ # recover serial immediately at client library instantiation. This is done immediately so that any queued
+ # publishes use the correct serial number for these queued messages as well.
+ # There is no harm if the msgSerial is higher than expected if the recover fails.
+ recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i
+ if recovery_msg_serial
+ @client_msg_serial = recovery_msg_serial
+ else
+ reset_client_msg_serial
+ end
+
Client::IncomingMessageDispatcher.new client, self
Client::OutgoingMessageDispatcher.new client, self
@state_machine = ConnectionStateMachine.new(self)
@state = STATE(state_machine.current_state)
@manager = ConnectionManager.new(self)
+
+ @current_host = client.endpoint.host
end
# Causes the connection to close, entering the closed state, from any state except
# the failed state. Once closed, the library will not attempt to re-establish the
# connection without a call to {Connection#connect}.
@@ -301,22 +313,21 @@
end
# @!attribute [r] recovery_key
# @return [String] recovery key that can be used by another client to recover this connection with the :recover option
def recovery_key
- "#{key}:#{serial}" if connection_resumable?
+ "#{key}:#{serial}:#{client_msg_serial}" if connection_resumable?
end
# Following a new connection being made, the connection ID, connection key
- # and message serial need to match the details provided by the server.
+ # and connection serial need to match the details provided by the server.
#
# @return [void]
# @api private
def configure_new(connection_id, connection_key, connection_serial)
@id = connection_id
@key = connection_key
- @client_serial = connection_serial
update_connection_serial connection_serial
end
# Store last received connection serial so that the connection can be resumed from the last known point-in-time
@@ -540,15 +551,15 @@
# See RTN23a
(details && details.max_idle_interval).to_i +
defaults.fetch(:realtime_request_timeout)
end
- # Resets the client serial (msgSerial) sent to Ably for each new {Ably::Models::ProtocolMessage}
- # (see #client_serial)
+ # Resets the client message serial (msgSerial) sent to Ably for each new {Ably::Models::ProtocolMessage}
+ # (see #client_msg_serial)
# @api private
- def reset_client_serial
- @client_serial = -1
+ def reset_client_msg_serial
+ @client_msg_serial = -1
end
# When a hearbeat or any other message from Ably is received
# we know it's alive, see #RTN23
# @api private
@@ -566,19 +577,19 @@
# #transition_state_machine must be used instead
private :change_state
private
- # The client serial is incremented for every message that is published that requires an ACK.
+ # The client message serial (msgSerial) is incremented for every message that is published that requires an ACK.
# Note that this is different to the connection serial that contains the last known serial number
# received from the server.
#
# A message serial number does not guarantee a message has been received, only sent.
# A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes.
# @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent
- def client_serial
- @client_serial
+ def client_msg_serial
+ @client_msg_serial
end
def resume_callbacks
@resume_callbacks ||= []
end
@@ -599,14 +610,14 @@
yield
end
end
def add_message_serial_to(protocol_message)
- @client_serial += 1
- protocol_message[:msgSerial] = client_serial
+ @client_msg_serial += 1
+ protocol_message[:msgSerial] = client_msg_serial
yield
rescue StandardError => e
- @client_serial -= 1
+ @client_msg_serial -= 1
raise e
end
# Simply wait until the next EventMachine tick to ensure Connection initialization is complete
def when_initialized