lib/ably/realtime/presence.rb in ably-0.8.15 vs lib/ably/realtime/presence.rb in ably-1.0.0
- old
+ new
@@ -11,25 +11,19 @@
STATE = ruby_enum('STATE',
:initialized,
:entering,
:entered,
:leaving,
- :left,
- :failed
+ :left
)
include Ably::Modules::StateEmitter
include Ably::Modules::UsesStateMachine
# {Ably::Realtime::Channel} this Presence object is associated with
# @return [Ably::Realtime::Channel]
attr_reader :channel
- # A unique identifier for this channel client based on their connection, disambiguating situations
- # where a given client_id is present on multiple connections simultaneously.
- # @return [String]
- attr_reader :connection_id
-
# The client_id for the member present on this channel
# @return [String]
attr_reader :client_id
# The data for the member present on this channel
@@ -70,25 +64,28 @@
ensure_supported_payload data
@data = data
return deferrable_succeed(deferrable, &success_block) if state == STATE.Entered
- ensure_presence_publishable_on_connection
+ requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable
+ return requirements_failed_deferrable if requirements_failed_deferrable
+
ensure_channel_attached(deferrable) do
if entering?
once_or_if(STATE.Entered, else: proc { |args| deferrable_fail deferrable, *args }) do
deferrable_succeed deferrable, &success_block
end
else
+ current_state = state
change_state STATE.Entering
send_protocol_message_and_transition_state_to(
Ably::Models::PresenceMessage::ACTION.Enter,
deferrable: deferrable,
target_state: STATE.Entered,
data: data,
client_id: client_id,
- failed_state: STATE.Failed,
+ failed_state: current_state, # return to current state if enter fails
&success_block
)
end
end
end
@@ -123,31 +120,33 @@
#
def leave(data = nil, &success_block)
deferrable = create_deferrable
ensure_supported_payload data
- raise Ably::Exceptions::Standard.new('Unable to leave presence channel that is not entered', 400, 91002) unless able_to_leave?
@data = data
return deferrable_succeed(deferrable, &success_block) if state == STATE.Left
- ensure_presence_publishable_on_connection
+ requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable
+ return requirements_failed_deferrable if requirements_failed_deferrable
+
ensure_channel_attached(deferrable) do
if leaving?
once_or_if(STATE.Left, else: proc { |error|deferrable_fail deferrable, *args }) do
deferrable_succeed deferrable, &success_block
end
else
+ current_state = state
change_state STATE.Leaving
send_protocol_message_and_transition_state_to(
Ably::Models::PresenceMessage::ACTION.Leave,
deferrable: deferrable,
target_state: STATE.Left,
data: data,
client_id: client_id,
- failed_state: STATE.Failed,
+ failed_state: current_state, # return to current state if leave fails
&success_block
)
end
end
end
@@ -181,11 +180,13 @@
ensure_supported_payload data
@data = data
- ensure_presence_publishable_on_connection
+ requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable
+ return requirements_failed_deferrable if requirements_failed_deferrable
+
ensure_channel_attached(deferrable) do
send_protocol_message_and_transition_state_to(
Ably::Models::PresenceMessage::ACTION.Update,
deferrable: deferrable,
target_state: STATE.Entered,
@@ -212,25 +213,39 @@
ensure_supported_payload data
send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Update, client_id, data, &success_block)
end
- # Get the presence state for this Channel.
+ # Get the presence members for this Channel.
#
# @param (see Ably::Realtime::Presence::MembersMap#get)
# @option options (see Ably::Realtime::Presence::MembersMap#get)
# @yield (see Ably::Realtime::Presence::MembersMap#get)
# @return (see Ably::Realtime::Presence::MembersMap#get)
#
def get(options = {}, &block)
deferrable = create_deferrable
- ensure_channel_attached(deferrable) do
+ # #RTP11d Don't return PresenceMap when wait for sync is true
+ # if the map is stale
+ wait_for_sync = options.fetch(:wait_for_sync, true)
+ if wait_for_sync && channel.suspended?
+ EventMachine.next_tick do
+ deferrable.fail Ably::Exceptions::InvalidState.new(
+ 'Presence state is out of sync as channel is SUSPENDED. Presence#get on a SUSPENDED channel is only supported with option wait_for_sync: false',
+ nil,
+ 91005
+ )
+ end
+ return deferrable
+ end
+
+ ensure_channel_attached(deferrable, allow_suspended: true) do
members.get(options).tap do |members_map_deferrable|
- members_map_deferrable.callback do |*args|
- safe_yield(block, *args) if block_given?
- deferrable.succeed(*args)
+ members_map_deferrable.callback do |members|
+ safe_yield(block, members) if block_given?
+ deferrable.succeed(members)
end
members_map_deferrable.errback do |*args|
deferrable.fail(*args)
end
end
@@ -244,13 +259,12 @@
# @yield [Ably::Models::PresenceMessage] For each presence state change event, the block is called
#
# @return [void]
#
def subscribe(*actions, &callback)
- ensure_channel_attached do
- super
- end
+ implicit_attach
+ super
end
# Unsubscribe the matching block for presence events on the associated Channel.
# If a block is not provided, all subscriptions will be unsubscribed
#
@@ -277,11 +291,14 @@
#
# @return [Ably::Util::SafeDeferrable]
#
def history(options = {}, &callback)
if options.delete(:until_attach)
- raise ArgumentError, 'option :until_attach cannot be specified if the channel is not attached' unless channel.attached?
+ unless channel.attached?
+ error = Ably::Exceptions::InvalidRequest.new('option :until_attach is invalid as the channel is not attached')
+ return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
+ end
options[:from_serial] = channel.attached_serial
end
async_wrap(callback) do
rest_presence.history(options.merge(async_blocking_operations: true))
@@ -295,17 +312,10 @@
@__incoming_msgbus__ ||= Ably::Util::PubSub.new(
coerce_into: Proc.new { |event| Ably::Models::ProtocolMessage::ACTION(event) }
)
end
- # Configure the connection ID for this presence channel.
- # Typically configured only once when a user first enters a presence channel.
- # @api private
- def set_connection_id(new_connection_id)
- @connection_id = new_connection_id
- end
-
# Used by {Ably::Modules::StateEmitter} to debug action changes
# @api private
def logger
client.logger
end
@@ -314,14 +324,10 @@
def sync_complete?
members.sync_complete?
end
private
- def able_to_leave?
- entering? || entered?
- end
-
# @return [Ably::Models::PresenceMessage] presence message is returned allowing callbacks to be added
def send_presence_protocol_message(presence_action, client_id, data)
presence_message = create_presence_message(presence_action, client_id, data)
unless presence_message.client_id
raise Ably::Exceptions::Standard.new('Unable to enter create presence message without a client_id', 400, 91000)
@@ -344,38 +350,46 @@
clientId: client_id,
data: data
}
Ably::Models::PresenceMessage.new(model, logger: logger).tap do |presence_message|
- presence_message.encode self.channel
+ presence_message.encode(client.encoders, channel.options) do |encode_error, error_message|
+ client.logger.error error_message
+ end
end
end
- def ensure_presence_publishable_on_connection
+ def ensure_presence_publishable_on_connection_deferrable
if !connection.can_publish_messages?
- raise Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}")
+ error = Ably::Exceptions::MessageQueueingDisabled.new("Presence event cannot be published as they cannot be queued when the connection is #{connection.state}")
+ Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end
end
- def ensure_channel_attached(deferrable = nil)
+ def ensure_channel_attached(deferrable = nil, options = {})
if channel.attached?
yield
+ elsif options[:allow_suspended] && channel.suspended?
+ yield
else
- attach_channel_then { yield }
+ attach_channel_then(deferrable) { yield }
end
deferrable
end
def ensure_supported_client_id(check_client_id)
unless check_client_id
- raise Ably::Exceptions::IncompatibleClientId.new('Unable to enter/update/leave presence channel without a client_id', 400, 40012)
+ raise Ably::Exceptions::IncompatibleClientId.new('Unable to enter/update/leave presence channel without a client_id')
end
if check_client_id == '*'
- raise Ably::Exceptions::IncompatibleClientId.new('Unable to enter/update/leave presence channel with the reserved wildcard client_id', 400, 40012)
+ raise Ably::Exceptions::IncompatibleClientId.new('Unable to enter/update/leave presence channel with the reserved wildcard client_id')
end
+ unless check_client_id.kind_of?(String)
+ raise Ably::Exceptions::IncompatibleClientId.new('Unable to enter/update/leave with a non String client_id value')
+ end
unless client.auth.can_assume_client_id?(check_client_id)
- raise Ably::Exceptions::IncompatibleClientId.new("Cannot enter with provided client_id '#{check_client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'", 400, 40012)
+ raise Ably::Exceptions::IncompatibleClientId.new("Cannot enter with provided client_id '#{check_client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
end
end
def send_protocol_message_and_transition_state_to(action, options = {}, &success_block)
deferrable = options.fetch(:deferrable) { raise ArgumentError, 'option :deferrable is required' }
@@ -407,27 +421,38 @@
EventMachine.next_tick { deferrable.fail(*args) } # allow errback to be added to the returned Deferrable
deferrable
end
def send_presence_action_for_client(action, client_id, data, &success_block)
- ensure_presence_publishable_on_connection
+ requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable
+ return requirements_failed_deferrable if requirements_failed_deferrable
deferrable = create_deferrable
ensure_channel_attached(deferrable) do
send_presence_protocol_message(action, client_id, data).tap do |protocol_message|
protocol_message.callback { |message| deferrable_succeed deferrable, &success_block }
protocol_message.errback { |error| deferrable_fail deferrable, error }
end
end
end
- def attach_channel_then
+ def attach_channel_then(deferrable)
if channel.detached? || channel.failed?
- raise Ably::Exceptions::InvalidStateChange.new("Operation is not allowed when channel is in #{channel.state}", 400, 91001)
+ deferrable.fail Ably::Exceptions::InvalidState.new("Operation is not allowed when channel is in #{channel.state}", 400, 91001)
else
- channel.unsafe_once(Channel::STATE.Attached) { yield }
+ channel.unsafe_once(:attached, :detached, :failed) do |channel_state_change|
+ if channel_state_change.current == :attached
+ yield
+ else
+ deferrable.fail Ably::Exceptions::InvalidState.new("Operation failed as channel transitioned to #{channel_state_change.current}", 400, 91001)
+ end
+ end
channel.attach
end
+ end
+
+ def implicit_attach
+ channel.attach if channel.initialized?
end
def client
channel.client
end