lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb in ably-rest-1.2.4 vs lib/submodules/ably-ruby/lib/ably/realtime/presence/members_map.rb in ably-rest-1.2.6
- old
+ new
@@ -21,11 +21,11 @@
STATE = ruby_enum('STATE',
:initialized,
:sync_starting, # Indicates the client is waiting for SYNC ProtocolMessages from Ably
:sync_none, # Indicates the ATTACHED ProtocolMessage had no presence flag and thus no members on the channel
:finalizing_sync,
- :in_sync,
+ :sync_complete, # Indicates completion of server initiated sync
:failed
)
include Ably::Modules::StateEmitter
def initialize(presence)
@@ -47,20 +47,10 @@
@sync_session_id = -1
setup_event_handlers
end
- # When attaching to a channel that has members present, the server
- # initiates a sync automatically so that the client has a complete list of members.
- #
- # Until this sync is complete, this method returns false
- #
- # @return [Boolean]
- def sync_complete?
- in_sync?
- end
-
# Update the SYNC serial from the ProtocolMessage so that SYNC can be resumed.
# If the serial is nil, or the part after the first : is empty, then the SYNC is complete
#
# @return [void]
#
@@ -108,31 +98,31 @@
result_block.call
else
# Must be defined before subsequent procs reference this callback
reset_callbacks = nil
- in_sync_callback = lambda do
+ sync_complete_callback = lambda do
reset_callbacks.call if reset_callbacks
result_block.call
end
- failed_callback = lambda do |error|
+ sync_failed_callback = lambda do |error|
reset_callbacks.call if reset_callbacks
deferrable.fail error
end
reset_callbacks = lambda do
- off(&in_sync_callback)
- off(&failed_callback)
- channel.off(&failed_callback)
+ off(&sync_complete_callback)
+ off(&sync_failed_callback)
+ channel.off(&sync_failed_callback)
end
- unsafe_once(:in_sync, &in_sync_callback)
- unsafe_once(:failed, &failed_callback)
+ unsafe_once(:sync_complete, &sync_complete_callback)
+ unsafe_once(:failed, &sync_failed_callback)
channel.unsafe_once(:detaching, :detached, :failed) do |error_reason|
- failed_callback.call error_reason
+ sync_failed_callback.call error_reason
end
end
deferrable
end
@@ -154,16 +144,37 @@
# A copy of the local members present i.e. members entered from this connection
# and thus the responsibility of this library to re-enter on the channel automatically if the
# channel loses continuity
#
- # @return [Array<PresenceMessage>]
+ # @return [Hash<String, PresenceMessage>]
# @api private
def local_members
@local_members
end
+ def enter_local_members
+ local_members.values.each do |member|
+ logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{member.client_id} with data: #{member.data}" }
+ presence.enter_client_with_id(member.id, member.client_id, member.data).tap do |deferrable|
+ deferrable.errback do |error|
+ re_enter_error = Ably::Models::ErrorInfo.new(
+ message: "unable to automatically re-enter presence channel for client_id '#{member.client_id}'. Source error code #{error.code} and message '#{error.message}'",
+ code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL
+ )
+ channel.emit :update, Ably::Models::ChannelStateChange.new(
+ current: channel.state,
+ previous: channel.state,
+ event: Ably::Realtime::Channel::EVENT(:update),
+ reason: re_enter_error,
+ resumed: true
+ )
+ end
+ end
+ end
+ end
+
private
attr_reader :sync_session_id
def members
@members
@@ -211,94 +222,33 @@
client.logger.error error_message
end
update_members_and_emit_events presence_message
end
+ # RTP5a
channel.unsafe_on(:failed, :detached) do
reset_members
reset_local_members
end
- resume_sync_proc = method(:resume_sync).to_proc
-
unsafe_on(:sync_starting) do
@sync_session_id += 1
-
- channel.unsafe_once(:attached) do
- connection.on_resume(&resume_sync_proc)
- end
-
- unsafe_once(:in_sync, :failed) do
- connection.off_resume(&resume_sync_proc)
- end
end
unsafe_on(:sync_none) do
@sync_session_id += 1
# Immediately change to finalizing which will result in all members being cleaned up
change_state :finalizing_sync
end
unsafe_on(:finalizing_sync) do
clean_up_absent_members
- clean_up_members_not_present_in_sync
- change_state :in_sync
+ clean_up_members_not_present_after_sync
+ change_state :sync_complete
end
-
- unsafe_on(:in_sync) do
- update_local_member_state
- end
end
- # Listen for events that change the PresenceMap state and thus
- # need to be replicated to the local member set
- def update_local_member_state
- new_local_members = members.select do |member_key, member|
- member.fetch(:message).connection_id == connection.id
- end.each_with_object({}) do |(member_key, member), hash_object|
- hash_object[member_key] = member.fetch(:message)
- end
-
- @local_members.reject do |member_key, message|
- new_local_members.keys.include?(member_key)
- end.each do |member_key, message|
- re_enter_local_member_missing_from_presence_map message
- end
-
- @local_members = new_local_members
- end
-
- def re_enter_local_member_missing_from_presence_map(presence_message)
- local_client_id = presence_message.client_id || client.auth.client_id
- logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{presence_message.data}" }
- presence.enter_client(local_client_id, presence_message.data).tap do |deferrable|
- deferrable.errback do |error|
- presence_message_client_id = presence_message.client_id || client.auth.client_id
- re_enter_error = Ably::Models::ErrorInfo.new(
- message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'",
- code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL
- )
- channel.emit :update, Ably::Models::ChannelStateChange.new(
- current: channel.state,
- previous: channel.state,
- event: Ably::Realtime::Channel::EVENT(:update),
- reason: re_enter_error,
- resumed: true
- )
- end
- end
- end
-
- # Trigger a manual SYNC operation to resume member synchronisation from last known cursor position
- def resume_sync
- connection.send_protocol_message(
- action: Ably::Models::ProtocolMessage::ACTION.Sync.to_i,
- channel: channel.name,
- channel_serial: sync_serial
- ) if channel.attached?
- end
-
def update_members_and_emit_events(presence_message)
return unless ensure_presence_message_is_valid(presence_message)
unless should_update_member?(presence_message)
logger.debug { "#{self.class.name}: Skipped presence member #{presence_message.action} on channel #{presence.channel.name}.\n#{presence_message.to_json}" }
@@ -373,11 +323,11 @@
end
def remove_presence_member(presence_message)
logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' removed.\n#{presence_message.to_json}" }
- if in_sync?
+ if sync_complete?
member_set_delete presence_message
else
member_set_upsert presence_message, false
absent_member_cleanup_queue << presence_message
end
@@ -392,21 +342,20 @@
end
def member_set_upsert(presence_message, present)
members[presence_message.member_key] = { present: present, message: presence_message, sync_session_id: sync_session_id }
if presence_message.connection_id == connection.id
- local_members[presence_message.member_key] = presence_message
- logger.debug { "#{self.class.name}: Local member '#{presence_message.member_key}' added" }
+ local_members[presence_message.client_id] = presence_message
+ logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' added" }
end
end
def member_set_delete(presence_message)
members.delete presence_message.member_key
- if in_sync?
- # If not in SYNC, then local members missing may need to be re-entered
- # Let #update_local_member_state handle missing members
- local_members.delete presence_message.member_key
+ if sync_complete? and presence_message.connection_id == connection.id
+ local_members.delete presence_message.client_id
+ logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' deleted" }
end
end
def present_members
members.select do |key, presence|
@@ -429,10 +378,10 @@
logger.debug { "#{self.class.name}: Cleaning up absent member '#{member_to_remove.member_key}' after SYNC.\n#{member_to_remove.to_json}" }
member_set_delete member_to_remove
end
end
- def clean_up_members_not_present_in_sync
+ def clean_up_members_not_present_after_sync
members.select do |member_key, member|
member.fetch(:sync_session_id) != sync_session_id
end.each do |member_key, member|
presence_message = member.fetch(:message).shallow_clone(action: Ably::Models::PresenceMessage::ACTION.Leave, id: nil)
logger.debug { "#{self.class.name}: Fabricating a LEAVE event for member '#{presence_message.member_key}' was not present in recently completed SYNC session ID '#{sync_session_id}'.\n#{presence_message.to_json}" }