lib/ably/realtime/presence.rb in ably-0.6.2 vs lib/ably/realtime/presence.rb in ably-0.7.0
- old
+ new
@@ -14,131 +14,234 @@
:failed
)
include Ably::Modules::StateEmitter
# {Ably::Realtime::Channel} this Presence object is associated with
- # @return {Ably::Realtime::Channel}
+ # @return [Ably::Realtime::Channel]
attr_reader :channel
- # A unique member identifier for this channel client, disambiguating situations where a given
- # client_id is present on multiple connections simultaneously.
- #
- # @note TODO: This does not work at present as no ACK is sent from the server with a memberId
- # @return {String}
- attr_reader :member_id
+ # A unique identifier for this channel client based on their connection, disambiguating situations
+ # where a given client_id is present on multiple connections simultaneously.
+ # @return [String]
+ attr_reader :connection_id
+ # The client_id for the member present on this channel
+ # @return [String]
+ attr_reader :client_id
+
+ # The data for the member present on this channel
+ # @return [String]
+ attr_reader :data
+
def initialize(channel)
@channel = channel
@state = STATE.Initialized
@members = Hash.new
@subscriptions = Hash.new { |hash, key| hash[key] = [] }
@client_id = client.client_id
- @data = nil
- @member_id = nil
setup_event_handlers
end
# Enter this client into this channel. This client will be added to the presence set
# and presence subscribers will see an enter message for this client.
#
- # @param [Hash,String] options an options Hash to specify client data and/or client ID, or a String with the client data
+ # @param [Hash] options an options Hash to specify client data and/or client ID
# @option options [String] :data optional data (eg a status message) for this member
# @option options [String] :client_id the optional id of the client.
# This option is provided to support connections from server instances that act on behalf of
# multiple client_ids. In order to be able to enter the channel with this method, the client
# library must have been instanced either with a key, or with a token bound to the wildcard clientId.
#
- # @yield [Ably::Realtime::Presence] On success, will call the block with the {Ably::Realtime::Presence}
+ # @yield [Ably::Realtime::Presence] On success, will call the block with this {Ably::Realtime::Presence} object
+ # @return [EventMachine::Deferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
#
- # @return [Ably::Models::PresenceMessage] Deferrable {Ably::Models::PresenceMessage} that supports both success (callback) and failure (errback) callbacks
- #
- def enter(options = {}, &blk)
+ def enter(options = {}, &success_block)
@client_id = options.fetch(:client_id, client_id)
@data = options.fetch(:data, data)
+ deferrable = EventMachine::DefaultDeferrable.new
raise Ably::Exceptions::Standard.new('Unable to enter presence channel without a client_id', 400, 91000) unless client_id
+ return deferrable_succeed(deferrable, &success_block) if state == STATE.Entered
- if state == STATE.Entered
- blk.call self if block_given?
- return
- end
-
- ensure_channel_attached do
- once(STATE.Entered) { blk.call self } if block_given?
-
- if !entering?
- change_state STATE.Entering
- send_presence_protocol_message(Ably::Models::PresenceMessage::ACTION.Enter).tap do |deferrable|
- deferrable.errback { |message, error| change_state STATE.Failed, error }
- deferrable.callback { |message| change_state STATE.Entered, message }
+ ensure_channel_attached(deferrable) do
+ if entering?
+ once_or_if(STATE.Entered, else: proc { |args| deferrable_fail deferrable, *args }) do
+ deferrable_succeed deferrable, &success_block
end
+ else
+ change_state STATE.Entering
+ send_protocol_message_and_transition_state_to(
+ Ably::Models::PresenceMessage::ACTION.Enter,
+ deferrable: deferrable,
+ target_state: STATE.Entered,
+ client_id: client_id,
+ data: data,
+ failed_state: STATE.Failed,
+ &success_block
+ )
end
end
end
+ # Enter the specified client_id into this channel. The given client will be added to the
+ # presence set and presence subscribers will see a corresponding presence message.
+ # This method is provided to support connections (e.g. connections from application
+ # server instances) that act on behalf of multiple client_ids. In order to be able to
+ # enter the channel with this method, the client library must have been instanced
+ # either with a key, or with a token bound to the wildcard client_id
+ #
+ # @param [String] client_id id of the client
+ #
+ # @param [Hash] options an options Hash for this client event
+ # @option options [String] :data optional data (eg a status message) for this member
+ #
+ # @yield [Ably::Realtime::Presence] On success, will call the block with this {Ably::Realtime::Presence} object
+ # @return [EventMachine::Deferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
+ #
+ def enter_client(client_id, options = {}, &success_block)
+ raise ArgumentError, 'options must be a Hash' unless options.kind_of?(Hash)
+ raise Ably::Exceptions::Standard.new('Unable to enter presence channel without a client_id', 400, 91000) unless client_id
+
+ send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Enter, client_id, options, &success_block)
+ end
+
# Leave this client from this channel. This client will be removed from the presence
# set and presence subscribers will see a leave message for this client.
#
- # @param (see Presence#enter)
+ # @param [Hash,String] options an options Hash to specify client data and/or client ID
+ # @option options [String] :data optional data (eg a status message) for this member
+ #
# @yield (see Presence#enter)
# @return (see Presence#enter)
#
- def leave(options = {}, &blk)
- raise Ably::Exceptions::Standard.new('Unable to leave presence channel that is not entered', 400, 91002) unless ably_to_leave?
+ def leave(options = {}, &success_block)
+ @data = options.fetch(:data) if options.has_key?(:data)
+ deferrable = EventMachine::DefaultDeferrable.new
- @data = options.fetch(:data, data)
+ raise Ably::Exceptions::Standard.new('Unable to leave presence channel that is not entered', 400, 91002) unless able_to_leave?
+ return deferrable_succeed(deferrable, &success_block) if state == STATE.Left
- if state == STATE.Left
- blk.call self if block_given?
- return
- end
-
- ensure_channel_attached do
- once(STATE.Left) { blk.call self } if block_given?
-
- if !leaving?
- change_state STATE.Leaving
- send_presence_protocol_message(Ably::Models::PresenceMessage::ACTION.Leave).tap do |deferrable|
- deferrable.errback { |message, error| change_state STATE.Failed, error }
- deferrable.callback { |message| change_state STATE.Left }
+ ensure_channel_attached(deferrable) do
+ if leaving?
+ once_or_if(STATE.Left, else: proc { |error|deferrable_fail deferrable, *args }) do
+ deferrable_succeed deferrable, &success_block
end
+ else
+ change_state STATE.Leaving
+ send_protocol_message_and_transition_state_to(
+ Ably::Models::PresenceMessage::ACTION.Leave,
+ deferrable: deferrable,
+ target_state: STATE.Left,
+ client_id: client_id,
+ data: data,
+ failed_state: STATE.Failed,
+ &success_block
+ )
end
end
end
+ # Leave a given client_id from this channel. This client will be removed from the
+ # presence set and presence subscribers will see a leave message for this client.
+ #
+ # @param (see Presence#enter_client)
+ # @option options (see Presence#enter_client)
+ #
+ # @yield (see Presence#enter_client)
+ # @return (see Presence#enter_client)
+ #
+ def leave_client(client_id, options = {}, &success_block)
+ raise ArgumentError, 'options must be a Hash' unless options.kind_of?(Hash)
+ raise Ably::Exceptions::Standard.new('Unable to leave presence channel without a client_id', 400, 91000) unless client_id
+
+ send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Leave, client_id, options, &success_block)
+ end
+
# Update the presence data for this client. If the client is not already a member of
# the presence set it will be added, and presence subscribers will see an enter or
# update message for this client.
#
- # @param (see Presence#enter)
+ # @param [Hash,String] options an options Hash to specify client data
+ # @option options [String] :data optional data (eg a status message) for this member
+ #
# @yield (see Presence#enter)
# @return (see Presence#enter)
#
- def update(options = {}, &blk)
- @data = options.fetch(:data, data)
+ def update(options = {}, &success_block)
+ @data = options.fetch(:data) if options.has_key?(:data)
+ deferrable = EventMachine::DefaultDeferrable.new
- ensure_channel_attached do
- send_presence_protocol_message(Ably::Models::PresenceMessage::ACTION.Update).tap do |deferrable|
- deferrable.callback do |message|
- change_state STATE.Entered, message unless entered?
- blk.call self if block_given?
- end
- end
+ ensure_channel_attached(deferrable) do
+ send_protocol_message_and_transition_state_to(
+ Ably::Models::PresenceMessage::ACTION.Update,
+ deferrable: deferrable,
+ target_state: STATE.Entered,
+ client_id: client_id,
+ data: data,
+ &success_block
+ )
end
end
+ # Update the presence data for a specified client_id into this channel.
+ # If the client is not already a member of the presence set it will be added, and
+ # presence subscribers will see an enter or update message for this client.
+ # As with {#enter_client}, the connection must be authenticated in a way that
+ # enables it to represent an arbitrary clientId.
+ #
+ # @param (see Presence#enter_client)
+ # @option options (see Presence#enter_client)
+ #
+ # @yield (see Presence#enter_client)
+ # @return (see Presence#enter_client)
+ #
+ def update_client(client_id, options = {}, &success_block)
+ raise ArgumentError, 'options must be a Hash' unless options.kind_of?(Hash)
+ raise Ably::Exceptions::Standard.new('Unable to enter presence channel without a client_id', 400, 91000) unless client_id
+
+ send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Update, client_id, options, &success_block)
+ end
+
# Get the presence state for this Channel.
- # Optionally get a member's {Ably::Models::PresenceMessage} state by member_id
#
- # @return [Array<Ably::Models::PresenceMessage>, Ably::Models::PresenceMessage] members on the channel
+ # @param [Hash,String] options an options Hash to filter members
+ # @option options [String] :client_id optional client_id for the member
+ # @option options [String] :connection_id optional connection_id for the member
+ # @option options [String] :wait_for_sync defaults to true, if false the get method returns the current list of members and does not wait for the presence sync to complete
#
- def get(member_id = nil)
- if member_id
- members.find { |key, presence| presence.member_id == member_id }
- else
- members.map { |key, presence| presence }
+ # @yield [Array<Ably::Models::PresenceMessage>] array of members or the member
+ #
+ # @return [EventMachine::Deferrable] Deferrable that supports both success (callback) and failure (errback) callback
+ #
+ def get(options = {}, &success_block)
+ wait_for_sync = options.fetch(:wait_for_sync, true)
+ deferrable = EventMachine::DefaultDeferrable.new
+
+ ensure_channel_attached(deferrable) do
+ result_block = proc do
+ members.map { |key, presence| presence }.tap do |filtered_members|
+ filtered_members.keep_if { |presence| presence.connection_id == options[:connection_id] } if options[:connection_id]
+ filtered_members.keep_if { |presence| presence.client_id == options[:client_id] } if options[:client_id]
+ end
+ end
+
+ if !wait_for_sync || sync_complete?
+ result = result_block.call
+ success_block.call result if block_given?
+ deferrable.succeed result
+ else
+ sync_pubsub.once(:done) do
+ result = result_block.call
+ success_block.call result if block_given?
+ deferrable.succeed result
+ end
+
+ sync_pubsub.once(:failed) do |error|
+ deferrable.fail error
+ end
+ end
end
end
# Subscribe to presence events on the associated Channel.
# This implicitly attaches the Channel if it is not already attached.
@@ -146,31 +249,31 @@
# @param action [Ably::Models::PresenceMessage::ACTION] Optional, the state change action to subscribe to. Defaults to all presence actions
# @yield [Ably::Models::PresenceMessage] For each presence state change event, the block is called
#
# @return [void]
#
- def subscribe(action = :all, &blk)
+ def subscribe(action = :all, &callback)
ensure_channel_attached do
- subscriptions[message_action_key(action)] << blk
+ subscriptions[message_action_key(action)] << callback
end
end
# Unsubscribe the matching block for presence events on the associated Channel.
# If a block is not provided, all subscriptions will be unsubscribed
#
# @param action [Ably::Models::PresenceMessage::ACTION] Optional, the state change action to subscribe to. Defaults to all presence actions
#
# @return [void]
#
- def unsubscribe(action = :all, &blk)
+ def unsubscribe(action = :all, &callback)
if message_action_key(action) == :all
subscriptions.keys
else
Array(message_action_key(action))
end.each do |key|
subscriptions[key].delete_if do |block|
- !block_given? || blk == block
+ !block_given? || callback == block
end
end
end
# Return the presence messages history for the channel
@@ -186,33 +289,102 @@
async_wrap(callback) do
rest_presence.history(options.merge(async_blocking_operations: true))
end
end
+ # When attaching to a channel that has members present, the client and server
+ # initiate a sync automatically so that the client has a complete list of members.
+ #
+ # Whilst this sync is happening, this method returns false
+ #
+ # @return [Boolean]
+ def sync_complete?
+ sync_complete
+ end
+
+ # Expect SYNC ProtocolMessages with a list of current members on this channel from the server
+ #
+ # @return [void]
+ #
+ # @api private
+ def sync_started
+ @sync_complete = false
+
+ sync_pubsub.once(:sync_complete) do
+ sync_changes_backlog.each do |presence_message|
+ apply_member_presence_changes presence_message
+ end
+ sync_completed
+ sync_pubsub.trigger :done
+ end
+
+ channel.once_or_if [:detached, :failed] do |error|
+ sync_completed
+ sync_pubsub.trigger :failed, error
+ end
+ end
+
+ # The server has indicated that no members are present on this channel and no SYNC is expected,
+ # or that the SYNC has now completed
+ #
+ # @return [void]
+ #
+ # @api private
+ def sync_completed
+ @sync_complete = true
+ @sync_changes_backlog = []
+ end
+
+ # Update the SYNC serial from the ProtocolMessage so that SYNC can be resumed.
+ # If the serial is nil, or the part after the first : is empty, then the SYNC is complete
+ #
+ # @return [void]
+ #
+ # @api private
+ def update_sync_serial(serial)
+ @sync_serial = serial
+ sync_pubsub.trigger :sync_complete if sync_serial_cursor_at_end?
+ end
+
# @!attribute [r] __incoming_msgbus__
# @return [Ably::Util::PubSub] Client library internal channel incoming message bus
# @api private
def __incoming_msgbus__
@__incoming_msgbus__ ||= Ably::Util::PubSub.new(
coerce_into: Proc.new { |event| Ably::Models::ProtocolMessage::ACTION(event) }
)
end
private
- attr_reader :members, :subscriptions, :client_id, :data
+ attr_reader :members, :subscriptions, :sync_serial, :sync_complete
- def ably_to_leave?
+
+ # A simple PubSub class used to publish synchronisation state changes
+ def sync_pubsub
+ @sync_pubsub ||= Ably::Util::PubSub.new
+ end
+
+ # During a SYNC of presence members, all enter, update and leave events are queued for processing once the SYNC is complete
+ def sync_changes_backlog
+ @sync_changes_backlog ||= []
+ end
+
+ # When channel serial in ProtocolMessage SYNC is nil or
+ # an empty cursor appears after the ':' such as 'cf30e75054887:psl_7g:client:189'
+ # then there are no more SYNC messages to come
+ def sync_serial_cursor_at_end?
+ sync_serial.nil? || sync_serial.to_s.match(/^[\w-]+:?$/)
+ end
+
+ def able_to_leave?
entering? || entered?
end
def setup_event_handlers
- __incoming_msgbus__.subscribe(:presence) do |presence|
- presence.decode self.channel
- update_members_from_presence_message presence
-
- subscriptions[:all].each { |cb| cb.call(presence) }
- subscriptions[presence.action].each { |cb| cb.call(presence) }
+ __incoming_msgbus__.subscribe(:presence, :sync) do |presence_message|
+ presence_message.decode self.channel
+ update_members_from_presence_message presence_message
end
channel.on(Channel::STATE.Detaching) do
change_state STATE.Leaving
end
@@ -224,17 +396,17 @@
channel.on(Channel::STATE.Failed) do
change_state STATE.Failed unless left? || initialized?
end
on(STATE.Entered) do |message|
- @member_id = message.member_id
+ @connection_id = message.connection_id
end
end
# @return [Ably::Models::PresenceMessage] presence message is returned allowing callbacks to be added
- def send_presence_protocol_message(presence_action)
- presence_message = create_presence_message(presence_action)
+ def send_presence_protocol_message(presence_action, client_id, options = {})
+ presence_message = create_presence_message(presence_action, client_id, options)
unless presence_message.client_id
raise Ably::Exceptions::Standard.new('Unable to enter create presence message without a client_id', 400, 91000)
end
protocol_message = {
@@ -246,46 +418,118 @@
client.connection.send_protocol_message protocol_message
presence_message
end
- def create_presence_message(action)
+ def create_presence_message(action, client_id, options = {})
model = {
action: Ably::Models::PresenceMessage.ACTION(action).to_i,
- clientId: client_id,
+ clientId: client_id
}
- model.merge!(data: data) if data
+ model.merge!(data: options.fetch(:data)) if options.has_key?(:data)
Ably::Models::PresenceMessage.new(model, nil).tap do |presence_message|
presence_message.encode self.channel
end
end
def update_members_from_presence_message(presence_message)
- unless presence_message.member_id
- new Ably::Exceptions::ProtocolError.new("Protocol error, presence message is missing memberId", 400, 80013)
+ unless presence_message.connection_id
+ Ably::Exceptions::ProtocolError.new("Protocol error, presence message is missing connectionId", 400, 80013)
end
- case presence_message.action
- when Ably::Models::PresenceMessage::ACTION.Enter
- members[presence_message.member_id] = presence_message
+ if sync_complete?
+ apply_member_presence_changes presence_message
+ else
+ if presence_message.action == Ably::Models::PresenceMessage::ACTION.Present
+ add_presence_member presence_message
+ publish_presence_member_state_change presence_message
+ else
+ sync_changes_backlog << presence_message
+ end
+ end
+ end
- when Ably::Models::PresenceMessage::ACTION.Update
- members[presence_message.member_id] = presence_message
-
+ def apply_member_presence_changes(presence_message)
+ case presence_message.action
+ when Ably::Models::PresenceMessage::ACTION.Enter, Ably::Models::PresenceMessage::ACTION.Update
+ add_presence_member presence_message
when Ably::Models::PresenceMessage::ACTION.Leave
- members.delete presence_message.member_id
-
+ remove_presence_member presence_message
else
- new Ably::Exceptions::ProtocolError.new("Protocol error, unknown presence action #{presence.action}", 400, 80013)
+ Ably::Exceptions::ProtocolError.new("Protocol error, unknown presence action #{presence_message.action}", 400, 80013)
end
+
+ publish_presence_member_state_change presence_message
end
- def ensure_channel_attached
+ def add_presence_member(presence_message)
+ members[presence_message.member_key] = presence_message
+ end
+
+ def remove_presence_member(presence_message)
+ members.delete presence_message.member_key
+ end
+
+ def publish_presence_member_state_change(presence_message)
+ subscriptions[:all].each { |cb| cb.call(presence_message) }
+ subscriptions[presence_message.action].each { |cb| cb.call(presence_message) }
+ end
+
+ def ensure_channel_attached(deferrable = nil)
if channel.attached?
yield
else
attach_channel_then { yield }
+ end
+ deferrable
+ end
+
+ def send_protocol_message_and_transition_state_to(action, options = {}, &success_block)
+ deferrable = options.fetch(:deferrable) { raise ArgumentError, 'option :deferrable is required' }
+ client_id = options.fetch(:client_id) { raise ArgumentError, 'option :client_id is required' }
+ target_state = options.fetch(:target_state, nil)
+ failed_state = options.fetch(:failed_state, nil)
+
+ protocol_message_options = if options.has_key?(:data)
+ { data: options.fetch(:data) }
+ else
+ { }
+ end
+
+ send_presence_protocol_message(action, client_id, protocol_message_options).tap do |protocol_message|
+ protocol_message.callback do |message|
+ change_state target_state, message if target_state
+ deferrable_succeed deferrable, &success_block
+ end
+
+ protocol_message.errback do |message, error|
+ change_state failed_state, error if failed_state
+ deferrable_fail deferrable, error
+ end
+ end
+ end
+
+ def deferrable_succeed(deferrable, *args, &block)
+ block.call self, *args if block_given?
+ EventMachine.next_tick { deferrable.succeed self, *args } # allow callback to be added to the returned Deferrable
+ deferrable
+ end
+
+ def deferrable_fail(deferrable, *args, &block)
+ block.call self, *args if block_given?
+ EventMachine.next_tick { deferrable.fail self, *args } # allow errback to be added to the returned Deferrable
+ deferrable
+ end
+
+ def send_presence_action_for_client(action, client_id, options = {}, &success_block)
+ deferrable = EventMachine::DefaultDeferrable.new
+
+ ensure_channel_attached(deferrable) do
+ send_presence_protocol_message(action, client_id, options).tap do |protocol_message|
+ protocol_message.callback { |message| deferrable_succeed deferrable, &success_block }
+ protocol_message.errback { |message| deferrable_fail deferrable }
+ end
end
end
def attach_channel_then
if channel.detached? || channel.failed?