lib/ably/realtime/presence.rb in ably-0.8.15 vs lib/ably/realtime/presence.rb in ably-1.0.0

- old
+ new

@@ -11,25 +11,19 @@ STATE = ruby_enum('STATE', :initialized, :entering, :entered, :leaving, - :left, - :failed + :left ) include Ably::Modules::StateEmitter include Ably::Modules::UsesStateMachine # {Ably::Realtime::Channel} this Presence object is associated with # @return [Ably::Realtime::Channel] attr_reader :channel - # A unique identifier for this channel client based on their connection, disambiguating situations - # where a given client_id is present on multiple connections simultaneously. - # @return [String] - attr_reader :connection_id - # The client_id for the member present on this channel # @return [String] attr_reader :client_id # The data for the member present on this channel @@ -70,25 +64,28 @@ ensure_supported_payload data @data = data return deferrable_succeed(deferrable, &success_block) if state == STATE.Entered - ensure_presence_publishable_on_connection + requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable + return requirements_failed_deferrable if requirements_failed_deferrable + ensure_channel_attached(deferrable) do if entering? once_or_if(STATE.Entered, else: proc { |args| deferrable_fail deferrable, *args }) do deferrable_succeed deferrable, &success_block end else + current_state = state change_state STATE.Entering send_protocol_message_and_transition_state_to( Ably::Models::PresenceMessage::ACTION.Enter, deferrable: deferrable, target_state: STATE.Entered, data: data, client_id: client_id, - failed_state: STATE.Failed, + failed_state: current_state, # return to current state if enter fails &success_block ) end end end @@ -123,31 +120,33 @@ # def leave(data = nil, &success_block) deferrable = create_deferrable ensure_supported_payload data - raise Ably::Exceptions::Standard.new('Unable to leave presence channel that is not entered', 400, 91002) unless able_to_leave? @data = data return deferrable_succeed(deferrable, &success_block) if state == STATE.Left - ensure_presence_publishable_on_connection + requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable + return requirements_failed_deferrable if requirements_failed_deferrable + ensure_channel_attached(deferrable) do if leaving? once_or_if(STATE.Left, else: proc { |error|deferrable_fail deferrable, *args }) do deferrable_succeed deferrable, &success_block end else + current_state = state change_state STATE.Leaving send_protocol_message_and_transition_state_to( Ably::Models::PresenceMessage::ACTION.Leave, deferrable: deferrable, target_state: STATE.Left, data: data, client_id: client_id, - failed_state: STATE.Failed, + failed_state: current_state, # return to current state if leave fails &success_block ) end end end @@ -181,11 +180,13 @@ ensure_supported_payload data @data = data - ensure_presence_publishable_on_connection + requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable + return requirements_failed_deferrable if requirements_failed_deferrable + ensure_channel_attached(deferrable) do send_protocol_message_and_transition_state_to( Ably::Models::PresenceMessage::ACTION.Update, deferrable: deferrable, target_state: STATE.Entered, @@ -212,25 +213,39 @@ ensure_supported_payload data send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Update, client_id, data, &success_block) end - # Get the presence state for this Channel. + # Get the presence members for this Channel. # # @param (see Ably::Realtime::Presence::MembersMap#get) # @option options (see Ably::Realtime::Presence::MembersMap#get) # @yield (see Ably::Realtime::Presence::MembersMap#get) # @return (see Ably::Realtime::Presence::MembersMap#get) # def get(options = {}, &block) deferrable = create_deferrable - ensure_channel_attached(deferrable) do + # #RTP11d Don't return PresenceMap when wait for sync is true + # if the map is stale + wait_for_sync = options.fetch(:wait_for_sync, true) + if wait_for_sync && channel.suspended? + EventMachine.next_tick do + deferrable.fail Ably::Exceptions::InvalidState.new( + 'Presence state is out of sync as channel is SUSPENDED. Presence#get on a SUSPENDED channel is only supported with option wait_for_sync: false', + nil, + 91005 + ) + end + return deferrable + end + + ensure_channel_attached(deferrable, allow_suspended: true) do members.get(options).tap do |members_map_deferrable| - members_map_deferrable.callback do |*args| - safe_yield(block, *args) if block_given? - deferrable.succeed(*args) + members_map_deferrable.callback do |members| + safe_yield(block, members) if block_given? + deferrable.succeed(members) end members_map_deferrable.errback do |*args| deferrable.fail(*args) end end @@ -244,13 +259,12 @@ # @yield [Ably::Models::PresenceMessage] For each presence state change event, the block is called # # @return [void] # def subscribe(*actions, &callback) - ensure_channel_attached do - super - end + implicit_attach + super end # Unsubscribe the matching block for presence events on the associated Channel. # If a block is not provided, all subscriptions will be unsubscribed # @@ -277,11 +291,14 @@ # # @return [Ably::Util::SafeDeferrable] # def history(options = {}, &callback) if options.delete(:until_attach) - raise ArgumentError, 'option :until_attach cannot be specified if the channel is not attached' unless channel.attached? + unless channel.attached? + error = Ably::Exceptions::InvalidRequest.new('option :until_attach is invalid as the channel is not attached') + return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) + end options[:from_serial] = channel.attached_serial end async_wrap(callback) do rest_presence.history(options.merge(async_blocking_operations: true)) @@ -295,17 +312,10 @@ @__incoming_msgbus__ ||= Ably::Util::PubSub.new( coerce_into: Proc.new { |event| Ably::Models::ProtocolMessage::ACTION(event) } ) end - # Configure the connection ID for this presence channel. - # Typically configured only once when a user first enters a presence channel. - # @api private - def set_connection_id(new_connection_id) - @connection_id = new_connection_id - end - # Used by {Ably::Modules::StateEmitter} to debug action changes # @api private def logger client.logger end @@ -314,14 +324,10 @@ def sync_complete? members.sync_complete? end private - def able_to_leave? - entering? || entered? - end - # @return [Ably::Models::PresenceMessage] presence message is returned allowing callbacks to be added def send_presence_protocol_message(presence_action, client_id, data) presence_message = create_presence_message(presence_action, client_id, data) unless presence_message.client_id raise Ably::Exceptions::Standard.new('Unable to enter create presence message without a client_id', 400, 91000) @@ -344,38 +350,46 @@ clientId: client_id, data: data } Ably::Models::PresenceMessage.new(model, logger: logger).tap do |presence_message| - presence_message.encode self.channel + presence_message.encode(client.encoders, channel.options) do |encode_error, error_message| + client.logger.error error_message + end end end - def ensure_presence_publishable_on_connection + def ensure_presence_publishable_on_connection_deferrable if !connection.can_publish_messages? - raise Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}") + error = Ably::Exceptions::MessageQueueingDisabled.new("Presence event cannot be published as they cannot be queued when the connection is #{connection.state}") + Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end end - def ensure_channel_attached(deferrable = nil) + def ensure_channel_attached(deferrable = nil, options = {}) if channel.attached? yield + elsif options[:allow_suspended] && channel.suspended? + yield else - attach_channel_then { yield } + attach_channel_then(deferrable) { yield } end deferrable end def ensure_supported_client_id(check_client_id) unless check_client_id - raise Ably::Exceptions::IncompatibleClientId.new('Unable to enter/update/leave presence channel without a client_id', 400, 40012) + raise Ably::Exceptions::IncompatibleClientId.new('Unable to enter/update/leave presence channel without a client_id') end if check_client_id == '*' - raise Ably::Exceptions::IncompatibleClientId.new('Unable to enter/update/leave presence channel with the reserved wildcard client_id', 400, 40012) + raise Ably::Exceptions::IncompatibleClientId.new('Unable to enter/update/leave presence channel with the reserved wildcard client_id') end + unless check_client_id.kind_of?(String) + raise Ably::Exceptions::IncompatibleClientId.new('Unable to enter/update/leave with a non String client_id value') + end unless client.auth.can_assume_client_id?(check_client_id) - raise Ably::Exceptions::IncompatibleClientId.new("Cannot enter with provided client_id '#{check_client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'", 400, 40012) + raise Ably::Exceptions::IncompatibleClientId.new("Cannot enter with provided client_id '#{check_client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'") end end def send_protocol_message_and_transition_state_to(action, options = {}, &success_block) deferrable = options.fetch(:deferrable) { raise ArgumentError, 'option :deferrable is required' } @@ -407,27 +421,38 @@ EventMachine.next_tick { deferrable.fail(*args) } # allow errback to be added to the returned Deferrable deferrable end def send_presence_action_for_client(action, client_id, data, &success_block) - ensure_presence_publishable_on_connection + requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable + return requirements_failed_deferrable if requirements_failed_deferrable deferrable = create_deferrable ensure_channel_attached(deferrable) do send_presence_protocol_message(action, client_id, data).tap do |protocol_message| protocol_message.callback { |message| deferrable_succeed deferrable, &success_block } protocol_message.errback { |error| deferrable_fail deferrable, error } end end end - def attach_channel_then + def attach_channel_then(deferrable) if channel.detached? || channel.failed? - raise Ably::Exceptions::InvalidStateChange.new("Operation is not allowed when channel is in #{channel.state}", 400, 91001) + deferrable.fail Ably::Exceptions::InvalidState.new("Operation is not allowed when channel is in #{channel.state}", 400, 91001) else - channel.unsafe_once(Channel::STATE.Attached) { yield } + channel.unsafe_once(:attached, :detached, :failed) do |channel_state_change| + if channel_state_change.current == :attached + yield + else + deferrable.fail Ably::Exceptions::InvalidState.new("Operation failed as channel transitioned to #{channel_state_change.current}", 400, 91001) + end + end channel.attach end + end + + def implicit_attach + channel.attach if channel.initialized? end def client channel.client end