lib/ably/realtime/presence/members_map.rb in ably-0.8.15 vs lib/ably/realtime/presence/members_map.rb in ably-1.0.0
- old
+ new
@@ -18,23 +18,36 @@
include Enumerable
extend Ably::Modules::Enum
STATE = ruby_enum('STATE',
:initialized,
- :sync_starting,
+ :sync_starting, # Indicates the client is waiting for SYNC ProtocolMessages from Ably
+ :sync_none, # Indicates the ATTACHED ProtocolMessage had no presence flag and thus no members on the channel
+ :finalizing_sync,
:in_sync,
:failed
)
include Ably::Modules::StateEmitter
def initialize(presence)
@presence = presence
- @state = STATE(:initialized)
- @members = Hash.new
+ @state = STATE(:initialized)
+
+ # Two sets of members maintained
+ # @members contains all members present on the channel
+ # @local_members contains only this connection's members for the purpose of re-entering the member if channel continuity is lost
+ reset_members
+ reset_local_members
+
@absent_member_cleanup_queue = []
+ # Each SYNC session has a unique ID so that following SYNC
+ # any members present in the map without this session ID are
+ # not present according to Ably, see #RTP19
+ @sync_session_id = -1
+
setup_event_handlers
end
# When attaching to a channel that has members present, the server
# initiates a sync automatically so that the client has a complete list of members.
@@ -52,26 +65,35 @@
# @return [void]
#
# @api private
def update_sync_serial(serial)
@sync_serial = serial
- change_state :in_sync if sync_serial_cursor_at_end?
end
+ # When channel serial in ProtocolMessage SYNC is nil or
+ # an empty cursor appears after the ':' such as 'cf30e75054887:psl_7g:client:189'.
+ # That is an indication that there are no more SYNC messages.
+ #
+ # @api private
+ #
+ def sync_serial_cursor_at_end?
+ sync_serial.nil? || sync_serial.to_s.match(/^[\w-]+:?$/)
+ end
+
# Get the list of presence members
#
# @param [Hash,String] options an options Hash to filter members
# @option options [String] :client_id optional client_id filter for the member
# @option options [String] :connection_id optional connection_id filter for the member
- # @option options [String] :wait_for_sync defaults to false, if true the get method waits for the initial presence sync following channel attachment to complete before returning the members present
+ # @option options [String] :wait_for_sync defaults to true, if true the get method waits for the initial presence sync following channel attachment to complete before returning the members present, else it immediately returns the members present currently
#
# @yield [Array<Ably::Models::PresenceMessage>] array of present members
#
# @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
#
def get(options = {}, &block)
- wait_for_sync = options.fetch(:wait_for_sync, false)
+ wait_for_sync = options.fetch(:wait_for_sync, true)
deferrable = Ably::Util::SafeDeferrable.new(logger)
result_block = proc do
present_members.tap do |members|
members.keep_if { |member| member.connection_id == options[:connection_id] } if options[:connection_id]
@@ -102,13 +124,13 @@
off(&in_sync_callback)
off(&failed_callback)
channel.off(&failed_callback)
end
- once(:in_sync, &in_sync_callback)
+ unsafe_once(:in_sync, &in_sync_callback)
+ unsafe_once(:failed, &failed_callback)
- once(:failed, &failed_callback)
channel.unsafe_once(:detaching, :detached, :failed) do |error_reason|
failed_callback.call error_reason
end
end
@@ -128,11 +150,23 @@
def each(&block)
return to_enum(:each) unless block_given?
present_members.each(&block)
end
+ # A copy of the local members present i.e. members entered from this connection
+ # and thus the responsibility of this library to re-enter on the channel automatically if the
+ # channel loses continuity
+ #
+ # @return [Array<PresenceMessage>]
+ # @api private
+ def local_members
+ @local_members
+ end
+
private
+ attr_reader :sync_session_id
+
def members
@members
end
def sync_serial
@@ -145,10 +179,18 @@
def absent_member_cleanup_queue
@absent_member_cleanup_queue
end
+ def reset_members
+ @members = Hash.new
+ end
+
+ def reset_local_members
+ @local_members = Hash.new
+ end
+
def channel
presence.channel
end
def client
@@ -163,46 +205,106 @@
client.connection
end
def setup_event_handlers
presence.__incoming_msgbus__.subscribe(:presence, :sync) do |presence_message|
- presence_message.decode channel
+ presence_message.decode(client.encoders, channel.options) do |encode_error, error_message|
+ client.logger.error error_message
+ end
update_members_and_emit_events presence_message
end
+ channel.unsafe_on(:failed, :detached) do
+ reset_members
+ reset_local_members
+ end
+
resume_sync_proc = method(:resume_sync).to_proc
- connection.on_resume(&resume_sync_proc)
- once(:in_sync, :failed) do
- connection.off_resume(&resume_sync_proc)
+
+ unsafe_on(:sync_starting) do
+ @sync_session_id += 1
+
+ channel.unsafe_once(:attached) do
+ connection.on_resume(&resume_sync_proc)
+ end
+
+ unsafe_once(:in_sync, :failed) do
+ connection.off_resume(&resume_sync_proc)
+ end
end
- once(:in_sync) do
+ unsafe_on(:sync_none) do
+ @sync_session_id += 1
+ # Immediately change to finalizing which will result in all members being cleaned up
+ change_state :finalizing_sync
+ end
+
+ unsafe_on(:finalizing_sync) do
clean_up_absent_members
+ clean_up_members_not_present_in_sync
+ change_state :in_sync
end
+
+ unsafe_on(:in_sync) do
+ update_local_member_state
+ end
end
+ # Listen for events that change the PresenceMap state and thus
+ # need to be replicated to the local member set
+ def update_local_member_state
+ new_local_members = members.select do |member_key, member|
+ member.fetch(:message).connection_id == connection.id
+ end.each_with_object({}) do |(member_key, member), hash_object|
+ hash_object[member_key] = member.fetch(:message)
+ end
+
+ @local_members.reject do |member_key, message|
+ new_local_members.keys.include?(member_key)
+ end.each do |member_key, message|
+ re_enter_local_member_missing_from_presence_map message
+ end
+
+ @local_members = new_local_members
+ end
+
+ def re_enter_local_member_missing_from_presence_map(presence_message)
+ local_client_id = presence_message.client_id || client.auth.client_id
+ logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{presence_message.data}" }
+ presence.enter_client(local_client_id, presence_message.data).tap do |deferrable|
+ deferrable.errback do |error|
+ presence_message_client_id = presence_message.client_id || client.auth.client_id
+ re_enter_error = Ably::Models::ErrorInfo.new(
+ message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'",
+ code: 91004
+ )
+ channel.emit :update, Ably::Models::ChannelStateChange.new(
+ current: channel.state,
+ previous: channel.state,
+ event: Ably::Realtime::Channel::EVENT(:update),
+ reason: re_enter_error,
+ resumed: true
+ )
+ end
+ end
+ end
+
# Trigger a manual SYNC operation to resume member synchronisation from last known cursor position
def resume_sync
connection.send_protocol_message(
action: Ably::Models::ProtocolMessage::ACTION.Sync.to_i,
channel: channel.name,
channel_serial: sync_serial
- )
+ ) if channel.attached?
end
- # When channel serial in ProtocolMessage SYNC is nil or
- # an empty cursor appears after the ':' such as 'cf30e75054887:psl_7g:client:189'.
- # That is an indication that there are no more SYNC messages.
- def sync_serial_cursor_at_end?
- sync_serial.nil? || sync_serial.to_s.match(/^[\w-]+:?$/)
- end
-
def update_members_and_emit_events(presence_message)
return unless ensure_presence_message_is_valid(presence_message)
unless should_update_member?(presence_message)
- logger.debug "#{self.class.name}: Skipped presence member #{presence_message.action} on channel #{presence.channel.name}.\n#{presence_message.to_safe_json}"
+ logger.debug { "#{self.class.name}: Skipped presence member #{presence_message.action} on channel #{presence.channel.name}.\n#{presence_message.to_json}" }
+ touch_presence_member presence_message
return
end
case presence_message.action
when Ably::Models::PresenceMessage::ACTION.Enter, Ably::Models::PresenceMessage::ACTION.Update, Ably::Models::PresenceMessage::ACTION.Present
@@ -216,49 +318,98 @@
def ensure_presence_message_is_valid(presence_message)
return true if presence_message.connection_id
error = Ably::Exceptions::ProtocolError.new("Protocol error, presence message is missing connectionId", 400, 80013)
- logger.error "PresenceMap: On channel '#{channel.name}' error: #{error}"
- channel.emit :error, error
+ logger.error { "PresenceMap: On channel '#{channel.name}' error: #{error}" }
end
# If the message received is older than the last known event for presence
- # then skip. This can occur during a SYNC operation. For example:
+ # then skip (return false). This can occur during a SYNC operation. For example:
# - SYNC starts
# - LEAVE event received for clientId 5
# - SYNC present even received for clientId 5 with a timestamp before LEAVE event because the LEAVE occured before the SYNC operation completed
#
- # @return [Boolean]
+ # @return [Boolean] true when +new_message+ is newer than the existing member in the PresenceMap
#
- def should_update_member?(presence_message)
- if members[presence_message.member_key]
- members[presence_message.member_key].fetch(:message).timestamp < presence_message.timestamp
+ def should_update_member?(new_message)
+ if members[new_message.member_key]
+ existing_message = members[new_message.member_key].fetch(:message)
+
+ # If both are messages published by clients (not fabricated), use the ID to determine newness, see #RTP2b2
+ if new_message.id.start_with?(new_message.connection_id) && existing_message.id.start_with?(existing_message.connection_id)
+ new_message_parts = new_message.id.match(/(\d+):(\d+)$/)
+ existing_message_parts = existing_message.id.match(/(\d+):(\d+)$/)
+
+ if !new_message_parts || !existing_message_parts
+ logger.fatal { "#{self.class.name}: Message IDs for new message #{new_message.id} or old message #{existing_message.id} are invalid. \nNew message: #{new_message.to_json}" }
+ return existing_message.timestamp < new_message.timestamp
+ end
+
+ # ID is in the format "connid:msgSerial:index" such as "aaaaaa:0:0"
+ # if msgSerial is greater then the new_message should update the member
+ # if msgSerial is equal and index is greater, then update the member
+ if new_message_parts[1].to_i > existing_message_parts[1].to_i # msgSerial
+ true
+ elsif new_message_parts[1].to_i == existing_message_parts[1].to_i # msgSerial equal
+ new_message_parts[2].to_i > existing_message_parts[2].to_i # compare index
+ else
+ false
+ end
+ else
+ # This message is fabricated or could not be validated so rely on timestamps, see #RTP2b1
+ new_message.timestamp > existing_message.timestamp
+ end
else
true
end
end
def add_presence_member(presence_message)
- logger.debug "#{self.class.name}: Member '#{presence_message.member_key}' for event '#{presence_message.action}' #{members.has_key?(presence_message.member_key) ? 'updated' : 'added'}.\n#{presence_message.to_safe_json}"
- members[presence_message.member_key] = { present: true, message: presence_message }
+ logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' for event '#{presence_message.action}' #{members.has_key?(presence_message.member_key) ? 'updated' : 'added'}.\n#{presence_message.to_json}" }
+ # Mutate the PresenceMessage so that the action is :present, see #RTP2d
+ present_presence_message = presence_message.shallow_clone(action: Ably::Models::PresenceMessage::ACTION.Present)
+ member_set_upsert present_presence_message, true
presence.emit_message presence_message.action, presence_message
end
def remove_presence_member(presence_message)
- logger.debug "#{self.class.name}: Member '#{presence_message.member_key}' removed.\n#{presence_message.to_safe_json}"
+ logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' removed.\n#{presence_message.to_json}" }
if in_sync?
- members.delete presence_message.member_key
+ member_set_delete presence_message
else
- members[presence_message.member_key] = { present: false, message: presence_message }
- absent_member_cleanup_queue << presence_message.member_key
+ member_set_upsert presence_message, false
+ absent_member_cleanup_queue << presence_message
end
presence.emit_message presence_message.action, presence_message
end
+ # No update is necessary for this member as older / no change during update
+ # however we need to update the sync_session_id so that this member is not removed following SYNC
+ def touch_presence_member(presence_message)
+ members.fetch(presence_message.member_key)[:sync_session_id] = sync_session_id
+ end
+
+ def member_set_upsert(presence_message, present)
+ members[presence_message.member_key] = { present: present, message: presence_message, sync_session_id: sync_session_id }
+ if presence_message.connection_id == connection.id
+ local_members[presence_message.member_key] = presence_message
+ logger.debug { "#{self.class.name}: Local member '#{presence_message.member_key}' added" }
+ end
+ end
+
+ def member_set_delete(presence_message)
+ members.delete presence_message.member_key
+ if in_sync?
+ # If not in SYNC, then local members missing may need to be re-entered
+ # Let #update_local_member_state handle missing members
+ local_members.delete presence_message.member_key
+ end
+ end
+
def present_members
members.select do |key, presence|
presence.fetch(:present)
end.map do |key, presence|
presence.fetch(:message)
@@ -272,10 +423,24 @@
presence.fetch(:message)
end
end
def clean_up_absent_members
- members.delete absent_member_cleanup_queue.shift
+ while member_to_remove = absent_member_cleanup_queue.shift
+ logger.debug { "#{self.class.name}: Cleaning up absent member '#{member_to_remove.member_key}' after SYNC.\n#{member_to_remove.to_json}" }
+ member_set_delete member_to_remove
+ end
+ end
+
+ def clean_up_members_not_present_in_sync
+ members.select do |member_key, member|
+ member.fetch(:sync_session_id) != sync_session_id
+ end.each do |member_key, member|
+ presence_message = member.fetch(:message).shallow_clone(action: Ably::Models::PresenceMessage::ACTION.Leave, id: nil)
+ logger.debug { "#{self.class.name}: Fabricating a LEAVE event for member '#{presence_message.member_key}' was not present in recently completed SYNC session ID '#{sync_session_id}'.\n#{presence_message.to_json}" }
+ member_set_delete member.fetch(:message)
+ presence.emit_message Ably::Models::PresenceMessage::ACTION.Leave, presence_message
+ end
end
end
end
end