lib/ably/realtime/presence.rb in ably-0.7.2 vs lib/ably/realtime/presence.rb in ably-0.7.4

- old
+ new

@@ -1,10 +1,12 @@ module Ably::Realtime # Presence provides access to presence operations and state for the associated Channel class Presence include Ably::Modules::EventEmitter include Ably::Modules::AsyncWrapper + include Ably::Modules::MessageEmitter + include Ably::Modules::SafeYield extend Ably::Modules::Enum STATE = ruby_enum('STATE', :initialized, :entering, @@ -12,10 +14,11 @@ :leaving, :left, :failed ) include Ably::Modules::StateEmitter + include Ably::Modules::UsesStateMachine # {Ably::Realtime::Channel} this Presence object is associated with # @return [Ably::Realtime::Channel] attr_reader :channel @@ -30,18 +33,28 @@ # The data for the member present on this channel # @return [String] attr_reader :data + # {MembersMap} containing an up to date list of members on this channel + # @return [MembersMap] + # @api private + attr_reader :members + + # The Presence manager responsible for actions relating to state changes such as entering a channel + # @return [Ably::Realtime::Presence::PresenceManager] + # @api private + attr_reader :manager + def initialize(channel) @channel = channel - @state = STATE.Initialized - @members = Hash.new - @subscriptions = Hash.new { |hash, key| hash[key] = [] } @client_id = client.client_id - setup_event_handlers + @state_machine = PresenceStateMachine.new(self) + @state = STATE(state_machine.current_state) + @members = MembersMap.new(self) + @manager = PresenceManager.new(self) end # Enter this client into this channel. This client will be added to the presence set # and presence subscribers will see an enter message for this client. # @@ -51,16 +64,16 @@ # This option is provided to support connections from server instances that act on behalf of # multiple client_ids. In order to be able to enter the channel with this method, the client # library must have been instanced either with a key, or with a token bound to the wildcard clientId. # # @yield [Ably::Realtime::Presence] On success, will call the block with this {Ably::Realtime::Presence} object - # @return [EventMachine::Deferrable] Deferrable that supports both success (callback) and failure (errback) callbacks + # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks # def enter(options = {}, &success_block) @client_id = options.fetch(:client_id, client_id) @data = options.fetch(:data, nil) - deferrable = EventMachine::DefaultDeferrable.new + deferrable = create_deferrable raise Ably::Exceptions::Standard.new('Unable to enter presence channel without a client_id', 400, 91000) unless client_id return deferrable_succeed(deferrable, &success_block) if state == STATE.Entered ensure_channel_attached(deferrable) do @@ -94,11 +107,11 @@ # # @param [Hash] options an options Hash for this client event # @option options [String] :data optional data (eg a status message) for this member # # @yield [Ably::Realtime::Presence] On success, will call the block with this {Ably::Realtime::Presence} object - # @return [EventMachine::Deferrable] Deferrable that supports both success (callback) and failure (errback) callbacks + # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks # def enter_client(client_id, options = {}, &success_block) raise ArgumentError, 'options must be a Hash' unless options.kind_of?(Hash) raise Ably::Exceptions::Standard.new('Unable to enter presence channel without a client_id', 400, 91000) unless client_id @@ -114,11 +127,11 @@ # @yield (see Presence#enter) # @return (see Presence#enter) # def leave(options = {}, &success_block) @data = options.fetch(:data, data) # nil value defaults leave data to existing value - deferrable = EventMachine::DefaultDeferrable.new + deferrable = create_deferrable raise Ably::Exceptions::Standard.new('Unable to leave presence channel that is not entered', 400, 91002) unless able_to_leave? return deferrable_succeed(deferrable, &success_block) if state == STATE.Left ensure_channel_attached(deferrable) do @@ -167,11 +180,11 @@ # @yield (see Presence#enter) # @return (see Presence#enter) # def update(options = {}, &success_block) @data = options.fetch(:data, nil) - deferrable = EventMachine::DefaultDeferrable.new + deferrable = create_deferrable ensure_channel_attached(deferrable) do send_protocol_message_and_transition_state_to( Ably::Models::PresenceMessage::ACTION.Update, deferrable: deferrable, @@ -202,207 +215,103 @@ send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Update, client_id, options, &success_block) end # Get the presence state for this Channel. # - # @param [Hash,String] options an options Hash to filter members - # @option options [String] :client_id optional client_id for the member - # @option options [String] :connection_id optional connection_id for the member - # @option options [String] :wait_for_sync defaults to true, if false the get method returns the current list of members and does not wait for the presence sync to complete + # @param (see Ably::Realtime::Presence::MembersMap#get) + # @option options (see Ably::Realtime::Presence::MembersMap#get) + # @yield (see Ably::Realtime::Presence::MembersMap#get) + # @return (see Ably::Realtime::Presence::MembersMap#get) # - # @yield [Array<Ably::Models::PresenceMessage>] array of members or the member - # - # @return [EventMachine::Deferrable] Deferrable that supports both success (callback) and failure (errback) callbacks - # - def get(options = {}) - wait_for_sync = options.fetch(:wait_for_sync, true) - deferrable = EventMachine::DefaultDeferrable.new + def get(options = {}, &block) + deferrable = create_deferrable ensure_channel_attached(deferrable) do - result_block = proc do - members.map { |key, presence| presence }.tap do |filtered_members| - filtered_members.keep_if { |presence| presence.connection_id == options[:connection_id] } if options[:connection_id] - filtered_members.keep_if { |presence| presence.client_id == options[:client_id] } if options[:client_id] - end.tap do |current_members| - yield current_members if block_given? - deferrable.succeed current_members + members.get(options).tap do |members_map_deferrable| + members_map_deferrable.callback do |*args| + safe_yield block, *args if block_given? + deferrable.succeed *args end - end - - if !wait_for_sync || sync_complete? - result_block.call - else - sync_pubsub.once(:done) do - result_block.call + members_map_deferrable.errback do |*args| + deferrable.fail *args end - - sync_pubsub.once(:failed) do |error| - deferrable.fail error - end end end end # Subscribe to presence events on the associated Channel. # This implicitly attaches the Channel if it is not already attached. # - # @param action [Ably::Models::PresenceMessage::ACTION] Optional, the state change action to subscribe to. Defaults to all presence actions + # @param actions [Ably::Models::PresenceMessage::ACTION] Optional, the state change action to subscribe to. Defaults to all presence actions # @yield [Ably::Models::PresenceMessage] For each presence state change event, the block is called # # @return [void] # - def subscribe(action = :all, &callback) + def subscribe(*actions, &callback) ensure_channel_attached do - subscriptions[message_action_key(action)] << callback + super end end # Unsubscribe the matching block for presence events on the associated Channel. # If a block is not provided, all subscriptions will be unsubscribed # - # @param action [Ably::Models::PresenceMessage::ACTION] Optional, the state change action to subscribe to. Defaults to all presence actions + # @param actions [Ably::Models::PresenceMessage::ACTION] Optional, the state change action to subscribe to. Defaults to all presence actions # # @return [void] # - def unsubscribe(action = :all, &callback) - if message_action_key(action) == :all - subscriptions.keys - else - Array(message_action_key(action)) - end.each do |key| - subscriptions[key].delete_if do |block| - !block_given? || callback == block - end - end + def unsubscribe(*actions, &callback) + super end # Return the presence messages history for the channel # # @param (see Ably::Rest::Presence#history) # @option options (see Ably::Rest::Presence#history) # # @yield [Ably::Models::PaginatedResource<Ably::Models::PresenceMessage>] An Array of {Ably::Models::PresenceMessage} objects that supports paging (#next_page, #first_page) # - # @return [EventMachine::Deferrable] + # @return [Ably::Util::SafeDeferrable] # def history(options = {}, &callback) async_wrap(callback) do rest_presence.history(options.merge(async_blocking_operations: true)) end end - # When attaching to a channel that has members present, the client and server - # initiate a sync automatically so that the client has a complete list of members. - # - # Whilst this sync is happening, this method returns false - # - # @return [Boolean] - def sync_complete? - sync_complete - end - - # Expect SYNC ProtocolMessages with a list of current members on this channel from the server - # - # @return [void] - # - # @api private - def sync_started - @sync_complete = false - - sync_pubsub.once(:sync_complete) do - sync_changes_backlog.each do |presence_message| - apply_member_presence_changes presence_message - end - sync_completed - sync_pubsub.trigger :done - end - - channel.once_or_if [:detached, :failed] do |error| - sync_completed - sync_pubsub.trigger :failed, error - end - end - - # The server has indicated that no members are present on this channel and no SYNC is expected, - # or that the SYNC has now completed - # - # @return [void] - # - # @api private - def sync_completed - @sync_complete = true - @sync_changes_backlog = [] - end - - # Update the SYNC serial from the ProtocolMessage so that SYNC can be resumed. - # If the serial is nil, or the part after the first : is empty, then the SYNC is complete - # - # @return [void] - # - # @api private - def update_sync_serial(serial) - @sync_serial = serial - sync_pubsub.trigger :sync_complete if sync_serial_cursor_at_end? - end - # @!attribute [r] __incoming_msgbus__ - # @return [Ably::Util::PubSub] Client library internal channel incoming message bus + # @return [Ably::Util::PubSub] Client library internal channel incoming protocol message bus # @api private def __incoming_msgbus__ @__incoming_msgbus__ ||= Ably::Util::PubSub.new( coerce_into: Proc.new { |event| Ably::Models::ProtocolMessage::ACTION(event) } ) end - private - attr_reader :members, :subscriptions, :sync_serial, :sync_complete - - - # A simple PubSub class used to publish synchronisation state changes - def sync_pubsub - @sync_pubsub ||= Ably::Util::PubSub.new + # Configure the connection ID for this presence channel. + # Typically configured only once when a user first enters a presence channel. + # @api private + def set_connection_id(new_connection_id) + @connection_id = new_connection_id end - # During a SYNC of presence members, all enter, update and leave events are queued for processing once the SYNC is complete - def sync_changes_backlog - @sync_changes_backlog ||= [] + # Used by {Ably::Modules::StateEmitter} to debug action changes + # @api private + def logger + client.logger end - # When channel serial in ProtocolMessage SYNC is nil or - # an empty cursor appears after the ':' such as 'cf30e75054887:psl_7g:client:189' - # then there are no more SYNC messages to come - def sync_serial_cursor_at_end? - sync_serial.nil? || sync_serial.to_s.match(/^[\w-]+:?$/) + # Returns true when the initial member SYNC following channel attach is completed + def sync_complete? + members.sync_complete? end + private def able_to_leave? entering? || entered? end - def setup_event_handlers - __incoming_msgbus__.subscribe(:presence, :sync) do |presence_message| - presence_message.decode self.channel - update_members_from_presence_message presence_message - end - - channel.on(Channel::STATE.Detaching) do - change_state STATE.Leaving - end - - channel.on(Channel::STATE.Detached) do - change_state STATE.Left - end - - channel.on(Channel::STATE.Failed) do - change_state STATE.Failed unless left? || initialized? - end - - on(STATE.Entered) do |message| - @connection_id = message.connection_id - end - end - # @return [Ably::Models::PresenceMessage] presence message is returned allowing callbacks to be added def send_presence_protocol_message(presence_action, client_id, options = {}) presence_message = create_presence_message(presence_action, client_id, options) unless presence_message.client_id raise Ably::Exceptions::Standard.new('Unable to enter create presence message without a client_id', 400, 91000) @@ -424,58 +333,15 @@ action: Ably::Models::PresenceMessage.ACTION(action).to_i, clientId: client_id } model.merge!(data: options.fetch(:data)) if options.has_key?(:data) - Ably::Models::PresenceMessage.new(model, nil).tap do |presence_message| + Ably::Models::PresenceMessage.new(model, logger: logger).tap do |presence_message| presence_message.encode self.channel end end - def update_members_from_presence_message(presence_message) - unless presence_message.connection_id - Ably::Exceptions::ProtocolError.new("Protocol error, presence message is missing connectionId", 400, 80013) - end - - if sync_complete? - apply_member_presence_changes presence_message - else - if presence_message.action == Ably::Models::PresenceMessage::ACTION.Present - add_presence_member presence_message - publish_presence_member_state_change presence_message - else - sync_changes_backlog << presence_message - end - end - end - - def apply_member_presence_changes(presence_message) - case presence_message.action - when Ably::Models::PresenceMessage::ACTION.Enter, Ably::Models::PresenceMessage::ACTION.Update - add_presence_member presence_message - when Ably::Models::PresenceMessage::ACTION.Leave - remove_presence_member presence_message - else - Ably::Exceptions::ProtocolError.new("Protocol error, unknown presence action #{presence_message.action}", 400, 80013) - end - - publish_presence_member_state_change presence_message - end - - def add_presence_member(presence_message) - members[presence_message.member_key] = presence_message - end - - def remove_presence_member(presence_message) - members.delete presence_message.member_key - end - - def publish_presence_member_state_change(presence_message) - subscriptions[:all].each { |cb| cb.call(presence_message) } - subscriptions[presence_message.action].each { |cb| cb.call(presence_message) } - end - def ensure_channel_attached(deferrable = nil) if channel.attached? yield else attach_channel_then { yield } @@ -506,38 +372,38 @@ deferrable_fail deferrable, error end end end - def deferrable_succeed(deferrable, *args) - yield self, *args if block_given? + def deferrable_succeed(deferrable, *args, &block) + safe_yield block, self, *args if block_given? EventMachine.next_tick { deferrable.succeed self, *args } # allow callback to be added to the returned Deferrable before calling succeed deferrable end - def deferrable_fail(deferrable, *args) - yield self, *args if block_given? + def deferrable_fail(deferrable, *args, &block) + safe_yield block, self, *args if block_given? EventMachine.next_tick { deferrable.fail self, *args } # allow errback to be added to the returned Deferrable deferrable - end + end def send_presence_action_for_client(action, client_id, options = {}, &success_block) - deferrable = EventMachine::DefaultDeferrable.new + deferrable = create_deferrable ensure_channel_attached(deferrable) do send_presence_protocol_message(action, client_id, options).tap do |protocol_message| protocol_message.callback { |message| deferrable_succeed deferrable, &success_block } - protocol_message.errback { |message| deferrable_fail deferrable } + protocol_message.errback { |message, error| deferrable_fail deferrable, error } end end end def attach_channel_then if channel.detached? || channel.failed? - raise Ably::Exceptions::Standard.new('Unable to enter presence channel in detached or failed action', 400, 91001) + raise Ably::Exceptions::IncompatibleStateForOperation.new("Operation is not allowed when channel is in #{channel.state}", 400, 91001) else - channel.once(Channel::STATE.Attached) { yield } + channel.unsafe_once(Channel::STATE.Attached) { yield } channel.attach end end def client @@ -546,19 +412,19 @@ def rest_presence client.rest_client.channel(channel.name).presence end - # Used by {Ably::Modules::StateEmitter} to debug action changes - def logger - client.logger + # Force subscriptions to match valid PresenceMessage actions + def message_emitter_subscriptions_coerce_message_key(name) + Ably::Models::PresenceMessage.ACTION(name) end - def message_action_key(action) - if action == :all - :all - else - Ably::Models::PresenceMessage.ACTION(action) - end + def create_deferrable + Ably::Util::SafeDeferrable.new(logger) end end end + +require 'ably/realtime/presence/presence_manager' +require 'ably/realtime/presence/members_map' +require 'ably/realtime/presence/presence_state_machine'