lib/ably/realtime/presence/members_map.rb in ably-1.2.4 vs lib/ably/realtime/presence/members_map.rb in ably-1.2.6

- old
+ new

@@ -21,11 +21,11 @@ STATE = ruby_enum('STATE', :initialized, :sync_starting, # Indicates the client is waiting for SYNC ProtocolMessages from Ably :sync_none, # Indicates the ATTACHED ProtocolMessage had no presence flag and thus no members on the channel :finalizing_sync, - :in_sync, + :sync_complete, # Indicates completion of server initiated sync :failed ) include Ably::Modules::StateEmitter def initialize(presence) @@ -47,20 +47,10 @@ @sync_session_id = -1 setup_event_handlers end - # When attaching to a channel that has members present, the server - # initiates a sync automatically so that the client has a complete list of members. - # - # Until this sync is complete, this method returns false - # - # @return [Boolean] - def sync_complete? - in_sync? - end - # Update the SYNC serial from the ProtocolMessage so that SYNC can be resumed. # If the serial is nil, or the part after the first : is empty, then the SYNC is complete # # @return [void] # @@ -108,31 +98,31 @@ result_block.call else # Must be defined before subsequent procs reference this callback reset_callbacks = nil - in_sync_callback = lambda do + sync_complete_callback = lambda do reset_callbacks.call if reset_callbacks result_block.call end - failed_callback = lambda do |error| + sync_failed_callback = lambda do |error| reset_callbacks.call if reset_callbacks deferrable.fail error end reset_callbacks = lambda do - off(&in_sync_callback) - off(&failed_callback) - channel.off(&failed_callback) + off(&sync_complete_callback) + off(&sync_failed_callback) + channel.off(&sync_failed_callback) end - unsafe_once(:in_sync, &in_sync_callback) - unsafe_once(:failed, &failed_callback) + unsafe_once(:sync_complete, &sync_complete_callback) + unsafe_once(:failed, &sync_failed_callback) channel.unsafe_once(:detaching, :detached, :failed) do |error_reason| - failed_callback.call error_reason + sync_failed_callback.call error_reason end end deferrable end @@ -154,16 +144,37 @@ # A copy of the local members present i.e. members entered from this connection # and thus the responsibility of this library to re-enter on the channel automatically if the # channel loses continuity # - # @return [Array<PresenceMessage>] + # @return [Hash<String, PresenceMessage>] # @api private def local_members @local_members end + def enter_local_members + local_members.values.each do |member| + logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{member.client_id} with data: #{member.data}" } + presence.enter_client_with_id(member.id, member.client_id, member.data).tap do |deferrable| + deferrable.errback do |error| + re_enter_error = Ably::Models::ErrorInfo.new( + message: "unable to automatically re-enter presence channel for client_id '#{member.client_id}'. Source error code #{error.code} and message '#{error.message}'", + code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL + ) + channel.emit :update, Ably::Models::ChannelStateChange.new( + current: channel.state, + previous: channel.state, + event: Ably::Realtime::Channel::EVENT(:update), + reason: re_enter_error, + resumed: true + ) + end + end + end + end + private attr_reader :sync_session_id def members @members @@ -211,94 +222,33 @@ client.logger.error error_message end update_members_and_emit_events presence_message end + # RTP5a channel.unsafe_on(:failed, :detached) do reset_members reset_local_members end - resume_sync_proc = method(:resume_sync).to_proc - unsafe_on(:sync_starting) do @sync_session_id += 1 - - channel.unsafe_once(:attached) do - connection.on_resume(&resume_sync_proc) - end - - unsafe_once(:in_sync, :failed) do - connection.off_resume(&resume_sync_proc) - end end unsafe_on(:sync_none) do @sync_session_id += 1 # Immediately change to finalizing which will result in all members being cleaned up change_state :finalizing_sync end unsafe_on(:finalizing_sync) do clean_up_absent_members - clean_up_members_not_present_in_sync - change_state :in_sync + clean_up_members_not_present_after_sync + change_state :sync_complete end - - unsafe_on(:in_sync) do - update_local_member_state - end end - # Listen for events that change the PresenceMap state and thus - # need to be replicated to the local member set - def update_local_member_state - new_local_members = members.select do |member_key, member| - member.fetch(:message).connection_id == connection.id - end.each_with_object({}) do |(member_key, member), hash_object| - hash_object[member_key] = member.fetch(:message) - end - - @local_members.reject do |member_key, message| - new_local_members.keys.include?(member_key) - end.each do |member_key, message| - re_enter_local_member_missing_from_presence_map message - end - - @local_members = new_local_members - end - - def re_enter_local_member_missing_from_presence_map(presence_message) - local_client_id = presence_message.client_id || client.auth.client_id - logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{presence_message.data}" } - presence.enter_client(local_client_id, presence_message.data).tap do |deferrable| - deferrable.errback do |error| - presence_message_client_id = presence_message.client_id || client.auth.client_id - re_enter_error = Ably::Models::ErrorInfo.new( - message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'", - code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL - ) - channel.emit :update, Ably::Models::ChannelStateChange.new( - current: channel.state, - previous: channel.state, - event: Ably::Realtime::Channel::EVENT(:update), - reason: re_enter_error, - resumed: true - ) - end - end - end - - # Trigger a manual SYNC operation to resume member synchronisation from last known cursor position - def resume_sync - connection.send_protocol_message( - action: Ably::Models::ProtocolMessage::ACTION.Sync.to_i, - channel: channel.name, - channel_serial: sync_serial - ) if channel.attached? - end - def update_members_and_emit_events(presence_message) return unless ensure_presence_message_is_valid(presence_message) unless should_update_member?(presence_message) logger.debug { "#{self.class.name}: Skipped presence member #{presence_message.action} on channel #{presence.channel.name}.\n#{presence_message.to_json}" } @@ -373,11 +323,11 @@ end def remove_presence_member(presence_message) logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' removed.\n#{presence_message.to_json}" } - if in_sync? + if sync_complete? member_set_delete presence_message else member_set_upsert presence_message, false absent_member_cleanup_queue << presence_message end @@ -392,21 +342,20 @@ end def member_set_upsert(presence_message, present) members[presence_message.member_key] = { present: present, message: presence_message, sync_session_id: sync_session_id } if presence_message.connection_id == connection.id - local_members[presence_message.member_key] = presence_message - logger.debug { "#{self.class.name}: Local member '#{presence_message.member_key}' added" } + local_members[presence_message.client_id] = presence_message + logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' added" } end end def member_set_delete(presence_message) members.delete presence_message.member_key - if in_sync? - # If not in SYNC, then local members missing may need to be re-entered - # Let #update_local_member_state handle missing members - local_members.delete presence_message.member_key + if sync_complete? and presence_message.connection_id == connection.id + local_members.delete presence_message.client_id + logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' deleted" } end end def present_members members.select do |key, presence| @@ -429,10 +378,10 @@ logger.debug { "#{self.class.name}: Cleaning up absent member '#{member_to_remove.member_key}' after SYNC.\n#{member_to_remove.to_json}" } member_set_delete member_to_remove end end - def clean_up_members_not_present_in_sync + def clean_up_members_not_present_after_sync members.select do |member_key, member| member.fetch(:sync_session_id) != sync_session_id end.each do |member_key, member| presence_message = member.fetch(:message).shallow_clone(action: Ably::Models::PresenceMessage::ACTION.Leave, id: nil) logger.debug { "#{self.class.name}: Fabricating a LEAVE event for member '#{presence_message.member_key}' was not present in recently completed SYNC session ID '#{sync_session_id}'.\n#{presence_message.to_json}" }