# encoding: utf-8
require 'spec_helper'
describe Ably::Realtime::Presence, :event_machine do
+ include Ably::Modules::Conversions
vary_by_protocol do
let(:default_options) { { api_key: api_key, environment: environment, protocol: protocol } }
let(:client_options) { default_options }
let(:anonymous_client) { Ably::Realtime::Client.new(client_options) }
@@ -18,25 +20,117 @@
let(:presence_client_one) { channel_client_one.presence }
let(:channel_client_two) { client_two.channel(channel_name) }
let(:presence_client_two) { channel_client_two.presence }
let(:data_payload) { random_str }
+ def force_connection_failure(client)
+ # Prevent any further SYNC messages coming in on this connection
+ client.connection.transport.send(:driver).remove_all_listeners('message')
+ client.connection.transport.unbind
+ end
+ shared_examples_for 'a public presence method' do |method_name, expected_state, args, options = {}|
+ def setup_test(method_name, args, options)
+ if options[:enter_first]
+ presence_client_one.public_send(method_name.to_s.gsub(/leave|update/, 'enter'), args) do
+ yield
+ end
+ else
+ yield
+ end
+ end
+ unless expected_state == :left
+ %w(detached failed).each do |state|
+ it "raise an exception if the channel is #{state}" do
+ setup_test(method_name, args, options) do
+ channel_client_one.attach do
+ channel_client_one.change_state state.to_sym
+ expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::IncompatibleStateForOperation, /Operation is not allowed when channel is in STATE.#{state}/i
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ it 'returns a SafeDeferrable that catches exceptions in callbacks and logs them' do
+ setup_test(method_name, args, options) do
+ expect(presence_client_one.public_send(method_name, args)).to be_a(Ably::Util::SafeDeferrable)
+ stop_reactor
+ end
+ end
+ it 'calls the Deferrable callback on success' do
+ setup_test(method_name, args, options) do
+ presence_client_one.public_send(method_name, args).callback do |presence|
+ expect(presence).to eql(presence_client_one)
+ expect(presence_client_one.state).to eq(expected_state) if expected_state
+ stop_reactor
+ end
+ end
+ end
+ it 'catches exceptions in the provided method block and logs them to the logger' do
+ setup_test(method_name, args, options) do
+ expect(presence_client_one.logger).to receive(:error).with(/Intentional exception/) do
+ stop_reactor
+ end
+ presence_client_one.public_send(method_name, args) { raise 'Intentional exception' }
+ end
+ end
+ context 'if connection fails before success' do
+ before do
+ # Reconfigure client library so that it makes no retry attempts and fails immediately
+ stub_const 'Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG',
+ Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG.merge(
+ disconnected: { retry_every: 0.1, max_time_in_state: 0 },
+ suspended: { retry_every: 0.1, max_time_in_state: 0 }
+ )
+ end
+ let(:client_options) { default_options.merge(log_level: :none) }
+ it 'calls the Deferrable errback if channel is detached' do
+ setup_test(method_name, args, options) do
+ channel_client_one.attach do
+ client_one.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ # Don't allow any messages to reach the server
+ client_one.connection.__outgoing_protocol_msgbus__.unsubscribe
+ force_connection_failure client_one
+ end
+ presence_client_one.public_send(method_name, args).tap do |deferrable|
+ deferrable.callback { raise 'Should not succeed' }
+ deferrable.errback do |presence, error|
+ expect(presence).to be_a(Ably::Realtime::Presence)
+ expect(error).to be_kind_of(Ably::Exceptions::BaseAblyException)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ end
+ end
context 'when attached (but not present) on a presence channel with an anonymous client (no client ID)' do
it 'maintains state as other clients enter and leave the channel' do
channel_anonymous_client.attach do
presence_anonymous_client.subscribe(:enter) do |presence_message|
expect(presence_message.client_id).to eql(client_one.client_id)
presence_anonymous_client.get do |members|
expect(members.first.client_id).to eql(client_one.client_id)
expect(members.first.action).to eq(:enter)
- presence_anonymous_client.subscribe(:leave) do |presence_message|
- expect(presence_message.client_id).to eql(client_one.client_id)
+ presence_anonymous_client.subscribe(:leave) do |leave_presence_message|
+ expect(leave_presence_message.client_id).to eql(client_one.client_id)
- presence_anonymous_client.get do |members|
- expect(members.count).to eql(0)
+ presence_anonymous_client.get do |members_once_left|
+ expect(members_once_left.count).to eql(0)
@@ -46,10 +140,53 @@
+ context '#members map', api_private: true do
+ it 'is available once the channel is created' do
+ expect(presence_anonymous_client.members).to_not be_nil
+ stop_reactor
+ end
+ it 'is not synchronised when initially created' do
+ expect(presence_anonymous_client.members).to_not be_sync_complete
+ stop_reactor
+ end
+ it 'will trigger an :in_sync event when synchronisation is complete' do
+ presence_client_one.enter
+ presence_client_two.enter
+ presence_anonymous_client.members.once(:in_sync) do
+ stop_reactor
+ end
+ end
+ context 'before server sync complete' do
+ it 'behaves like an Enumerable allowing direct access to current members' do
+ expect(presence_anonymous_client.members.count).to eql(0)
+ expect(presence_anonymous_client.members.map(&:member_key)).to eql([])
+ stop_reactor
+ end
+ end
+ context 'once server sync is complete' do
+ it 'behaves like an Enumerable allowing direct access to current members' do
+ when_all(presence_client_one.enter, presence_client_two.enter) do
+ presence_anonymous_client.members.once(:in_sync) do
+ expect(presence_anonymous_client.members.count).to eql(2)
+ member_ids = presence_anonymous_client.members.map(&:member_key)
+ expect(member_ids.count).to eql(2)
+ expect(member_ids.uniq.count).to eql(2)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
context '#sync_complete?' do
context 'when attaching to a channel without any members present' do
it 'is true and the presence channel is considered synced immediately' do
channel_anonymous_client.attach do
expect(channel_anonymous_client.presence).to be_sync_complete
@@ -71,37 +208,166 @@
- context 'when the SYNC of a presence channel spans multiple ProtocolMessage messages' do
- context 'with 250 existing (present) members' do
+ context '250 existing (present) members on a channel (3 SYNC pages)' do
+ context 'requires at least 3 SYNC ProtocolMessages' do
let(:enter_expected_count) { 250 }
let(:present) { [] }
let(:entered) { [] }
+ let(:sync_pages_received) { [] }
- context 'when a new client attaches to the presence channel', em_timeout: 10 do
+ def setup_members_on(presence)
+ enter_expected_count.times do |index|
+ presence.enter_client("client:#{index}") do |message|
+ entered << message
+ next unless entered.count == enter_expected_count
+ yield
+ end
+ end
+ end
+ context 'when a client attaches to the presence channel', em_timeout: 10 do
it 'emits :present for each member' do
- enter_expected_count.times do |index|
- presence_client_one.enter_client("client:#{index}") do |message|
- entered << message
- next unless entered.count == enter_expected_count
+ setup_members_on(presence_client_one) do
+ presence_anonymous_client.subscribe(:present) do |present_message|
+ expect(present_message.action).to eq(:present)
+ present << present_message
+ next unless present.count == enter_expected_count
+ expect(present.map(&:client_id).uniq.count).to eql(enter_expected_count)
+ stop_reactor
+ end
+ end
+ end
+ context 'and a member leaves before the SYNC operation is complete' do
+ it 'emits :leave immediately as the member leaves' do
+ all_client_ids = enter_expected_count.times.map { |id| "client:#{id}" }
+ setup_members_on(presence_client_one) do
+ leave_member = nil
presence_anonymous_client.subscribe(:present) do |present_message|
- expect(present_message.action).to eq(:present)
present << present_message
- next unless present.count == enter_expected_count
+ all_client_ids.delete(present_message.client_id)
+ end
- expect(present.map(&:client_id).uniq.count).to eql(enter_expected_count)
+ presence_anonymous_client.subscribe(:leave) do |leave_message|
+ expect(leave_message.client_id).to eql(leave_member.client_id)
+ expect(present.count).to be < enter_expected_count
+ anonymous_client.connect do
+ anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :sync
+ sync_pages_received << protocol_message
+ if sync_pages_received.count == 1
+ leave_action = Ably::Models::PresenceMessage::ACTION.Leave
+ leave_member = Ably::Models::PresenceMessage.new(
+ 'id' => "#{client_one.connection.id}-#{all_client_ids.first}:0",
+ 'clientId' => all_client_ids.first,
+ 'connectionId' => client_one.connection.id,
+ 'timestamp' => as_since_epoch(Time.now),
+ 'action' => leave_action
+ )
+ presence_anonymous_client.__incoming_msgbus__.publish :presence, leave_member
+ end
+ end
+ end
+ end
+ it 'ignores presence events with timestamps prior to the current :present event in the MembersMap' do
+ started_at = Time.now
+ setup_members_on(presence_client_one) do
+ leave_member = nil
+ presence_anonymous_client.subscribe(:present) do |present_message|
+ present << present_message
+ leave_member = present_message unless leave_member
+ if present.count == enter_expected_count
+ presence_anonymous_client.get do |members|
+ expect(members.find { |member| member.client_id == leave_member.client_id}.action).to eq(:present)
+ stop_reactor
+ end
+ end
+ end
+ presence_anonymous_client.subscribe(:leave) do |leave_message|
+ raise 'Leave event should not have been fired because it is out of date'
+ end
+ anonymous_client.connect do
+ anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :sync
+ sync_pages_received << protocol_message
+ if sync_pages_received.count == 1
+ leave_action = Ably::Models::PresenceMessage::ACTION.Leave
+ leave_member = Ably::Models::PresenceMessage.new(
+ leave_member.as_json.merge('action' => leave_action, 'timestamp' => as_since_epoch(started_at))
+ )
+ presence_anonymous_client.__incoming_msgbus__.publish :presence, leave_member
+ end
+ end
+ end
+ end
+ end
+ end
+ it 'does not emit :present after the :leave event has been emitted, and that member is not included in the list of members via #get' do
+ left_client = 10
+ left_client_id = "client:#{left_client}"
+ setup_members_on(presence_client_one) do
+ member_left_emitted = false
+ presence_anonymous_client.subscribe(:present) do |present_message|
+ if present_message.client_id == left_client_id
+ raise "Member #{present_message.client_id} should not have been emitted as present"
+ end
+ present << present_message.client_id
+ end
+ presence_anonymous_client.subscribe(:leave) do |leave_message|
+ if present.include?(leave_message.client_id)
+ raise "Member #{leave_message.client_id} should not have been emitted as present previously"
+ end
+ expect(leave_message.client_id).to eql(left_client_id)
+ member_left_emitted = true
+ end
+ presence_anonymous_client.get do |members|
+ expect(members.count).to eql(enter_expected_count - 1)
+ expect(member_left_emitted).to eql(true)
+ expect(members.map(&:client_id)).to_not include(left_client_id)
+ stop_reactor
+ end
+ channel_anonymous_client.attach do
+ leave_action = Ably::Models::PresenceMessage::ACTION.Leave
+ fake_leave_presence_message = Ably::Models::PresenceMessage.new(
+ 'id' => "#{client_one.connection.id}-#{left_client_id}:0",
+ 'clientId' => left_client_id,
+ 'connectionId' => client_one.connection.id,
+ 'timestamp' => as_since_epoch(Time.now),
+ 'action' => leave_action
+ )
+ # Push out a LEAVE event directly to the Presence object before it's received the :present action via the SYNC ProtocolMessage
+ presence_anonymous_client.__incoming_msgbus__.publish :presence, fake_leave_presence_message
+ end
+ end
+ end
context '#get' do
- it '#waits until sync is complete', event_machine: 15 do
+ it 'waits until sync is complete', event_machine: 15 do
enter_expected_count.times do |index|
presence_client_one.enter_client("client:#{index}") do |message|
entered << message
next unless entered.count == enter_expected_count
@@ -117,11 +383,11 @@
context 'automatic attachment of channel on access to presence object' do
- it 'is implicit if presence state is initalized' do
+ it 'is implicit if presence state is initialized' do
channel_client_one.on(:attached) do
expect(channel_client_one.state).to eq(:attached)
@@ -199,27 +465,44 @@
+ context 'message #connection_id' do
+ it 'matches the current client connection_id' do
+ channel_client_two.attach do
+ presence_client_one.enter
+ end
+ presence_client_two.subscribe do |presence|
+ expect(presence.connection_id).to eq(client_one.connection.id)
+ stop_reactor
+ end
+ end
+ end
it 'raises an exception if client_id is not set' do
expect { channel_anonymous_client.presence.enter }.to raise_error(Ably::Exceptions::Standard, /without a client_id/)
- it 'returns a Deferrable' do
- expect(presence_client_one.enter).to be_a(EventMachine::Deferrable)
- stop_reactor
- end
+ context 'without necessary capabilities to join presence' do
+ let(:restricted_client) do
+ Ably::Realtime::Client.new(default_options.merge(api_key: restricted_api_key, log_level: :fatal))
+ end
+ let(:restricted_channel) { restricted_client.channel("cansubscribe:channel") }
+ let(:restricted_presence) { restricted_channel.presence }
- it 'calls the Deferrable callback on success' do
- presence_client_one.enter.callback do |presence|
- expect(presence).to eql(presence_client_one)
- expect(presence_client_one.state).to eq(:entered)
- stop_reactor
+ it 'calls the Deferrable errback on capabilities failure' do
+ restricted_presence.enter(client_id: 'clientId').tap do |deferrable|
+ deferrable.callback { raise "Should not succeed" }
+ deferrable.errback { stop_reactor }
+ end
+ it_should_behave_like 'a public presence method', :enter, :entered, {}
context '#update' do
it 'without previous #enter automatically enters' do
presence_client_one.update(data: data_payload) do
@@ -264,26 +547,11 @@
expect(message.data).to be_nil
- it 'returns a Deferrable' do
- presence_client_one.enter do
- expect(presence_client_one.update).to be_a(EventMachine::Deferrable)
- stop_reactor
- end
- end
- it 'calls the Deferrable callback on success' do
- presence_client_one.enter do
- presence_client_one.update.callback do |presence|
- expect(presence).to eql(presence_client_one)
- expect(presence_client_one.state).to eq(:entered)
- stop_reactor
- end
- end
- end
+ it_should_behave_like 'a public presence method', :update, :entered, {}, enter_first: true
context '#leave' do
context ':data option' do
let(:data) { random_str }
@@ -332,26 +600,11 @@
it 'raises an exception if not entered' do
expect { channel_anonymous_client.presence.leave }.to raise_error(Ably::Exceptions::Standard, /Unable to leave presence channel that is not entered/)
- it 'returns a Deferrable' do
- presence_client_one.enter do
- expect(presence_client_one.leave).to be_a(EventMachine::Deferrable)
- stop_reactor
- end
- end
- it 'calls the Deferrable callback on success' do
- presence_client_one.enter do
- presence_client_one.leave.callback do |presence|
- expect(presence).to eql(presence_client_one)
- expect(presence_client_one.state).to eq(:left)
- stop_reactor
- end
- end
- end
+ it_should_behave_like 'a public presence method', :leave, :left, {}, enter_first: true
context ':left event' do
it 'emits the data defined in enter' do
channel_client_one.presence.enter(data: 'data') do
@@ -413,20 +666,41 @@
- it 'returns a Deferrable' do
- expect(presence_client_one.enter_client('client_id')).to be_a(EventMachine::Deferrable)
- stop_reactor
+ context 'message #connection_id' do
+ let(:client_id) { random_str }
+ it 'matches the current client connection_id' do
+ channel_client_two.attach do
+ presence_client_one.enter_client(client_id)
+ end
+ presence_client_two.subscribe do |presence|
+ expect(presence.client_id).to eq(client_id)
+ expect(presence.connection_id).to eq(client_one.connection.id)
+ stop_reactor
+ end
+ end
- it 'calls the Deferrable callback on success' do
- presence_client_one.enter_client('client_id').callback do |presence|
- expect(presence).to eql(presence_client_one)
- stop_reactor
+ it_should_behave_like 'a public presence method', :enter_client, nil, 'client_id'
+ context 'without necessary capabilities to enter on behalf of another client' do
+ let(:restricted_client) do
+ Ably::Realtime::Client.new(default_options.merge(api_key: restricted_api_key, log_level: :fatal))
+ let(:restricted_channel) { restricted_client.channel("cansubscribe:channel") }
+ let(:restricted_presence) { restricted_channel.presence }
+ it 'calls the Deferrable errback on capabilities failure' do
+ restricted_presence.enter_client('clientId').tap do |deferrable|
+ deferrable.callback { raise "Should not succeed" }
+ deferrable.errback { stop_reactor }
+ end
+ end
context '#update_client' do
context 'multiple times on the same channel with different client_ids' do
@@ -487,21 +761,11 @@
- it 'returns a Deferrable' do
- expect(presence_client_one.update_client('client_id')).to be_a(EventMachine::Deferrable)
- stop_reactor
- end
- it 'calls the Deferrable callback on success' do
- presence_client_one.update_client('client_id').callback do |presence|
- expect(presence).to eql(presence_client_one)
- stop_reactor
- end
- end
+ it_should_behave_like 'a public presence method', :update_client, nil, 'client_id'
context '#leave_client' do
context 'leaves a channel' do
context 'multiple times on the same channel with different client_ids' do
@@ -590,37 +854,110 @@
- it 'returns a Deferrable' do
- expect(presence_client_one.leave_client('client_id')).to be_a(EventMachine::Deferrable)
- stop_reactor
- end
- it 'calls the Deferrable callback on success' do
- presence_client_one.leave_client('client_id').callback do |presence|
- expect(presence).to eql(presence_client_one)
- stop_reactor
- end
- end
+ it_should_behave_like 'a public presence method', :leave_client, nil, 'client_id'
context '#get' do
- it 'returns a Deferrable' do
- expect(presence_client_one.get).to be_a(EventMachine::Deferrable)
+ it 'returns a SafeDeferrable that catches exceptions in callbacks and logs them' do
+ expect(presence_client_one.get).to be_a(Ably::Util::SafeDeferrable)
it 'calls the Deferrable callback on success' do
presence_client_one.get.callback do |presence|
expect(presence).to eq([])
+ it 'catches exceptions in the provided method block' do
+ expect(presence_client_one.logger).to receive(:error).with(/Intentional exception/) do
+ stop_reactor
+ end
+ presence_client_one.get { raise 'Intentional exception' }
+ end
+ %w(detached failed).each do |state|
+ it "raise an exception if the channel is #{state}" do
+ channel_client_one.attach do
+ channel_client_one.change_state state.to_sym
+ expect { presence_client_one.get }.to raise_error Ably::Exceptions::IncompatibleStateForOperation, /Operation is not allowed when channel is in STATE.#{state}/i
+ stop_reactor
+ end
+ end
+ end
+ context 'during a sync' do
+ let(:pages) { 2 }
+ let(:members_per_page) { 100 }
+ let(:sync_pages_received) { [] }
+ let(:client_options) { default_options.merge(log_level: :none) }
+ def connect_members_deferrables
+ (members_per_page * pages + 1).times.map do |index|
+ presence_client_one.enter_client("client:#{index}")
+ end
+ end
+ before do
+ # Reconfigure client library so that it makes no retry attempts and fails immediately
+ stub_const 'Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG',
+ Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG.merge(
+ disconnected: { retry_every: 0.1, max_time_in_state: 0 },
+ suspended: { retry_every: 0.1, max_time_in_state: 0 }
+ )
+ end
+ it 'fails if the connection fails' do
+ when_all(*connect_members_deferrables) do
+ channel_client_two.attach do
+ client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :sync
+ sync_pages_received << protocol_message
+ force_connection_failure client_two if sync_pages_received.count == 1
+ end
+ end
+ end
+ presence_client_two.get.tap do |deferrable|
+ deferrable.callback { raise 'Get should not succeed' }
+ deferrable.errback do |error|
+ stop_reactor
+ end
+ end
+ end
+ end
+ it 'fails if the channel is detached' do
+ when_all(*connect_members_deferrables) do
+ channel_client_two.attach do
+ client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :sync
+ # prevent any more SYNC messages coming through
+ client_two.connection.transport.__incoming_protocol_msgbus__.unsubscribe
+ channel_client_two.change_state :detaching
+ channel_client_two.change_state :detached
+ end
+ end
+ end
+ presence_client_two.get.tap do |deferrable|
+ deferrable.callback { raise 'Get should not succeed' }
+ deferrable.errback do |error|
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ # skip 'it fails if the connection changes to failed state'
it 'returns the current members on the channel' do
presence_client_one.enter do
presence_client_one.get do |members|
expect(members.count).to eq(1)
@@ -751,13 +1088,15 @@
expect(messages.map(&:action).map(&:to_sym)).to contain_exactly(:enter, :update, :leave)
- presence_client_one.enter
- presence_client_one.update
- presence_client_one.leave
+ presence_client_one.enter do
+ presence_client_one.update do
+ presence_client_one.leave
+ end
+ end
@@ -990,12 +1329,33 @@
- skip 'ensure connection_id is unique and updated on ENTER'
- skip 'ensure connection_id for presence member matches the messages they publish on the channel'
- skip 'stop a call to get when the channel has not been entered'
- skip 'stop a call to get when the channel has been entered but the list is not up to date'
- skip 'presence will resume sync if connection is dropped mid-way'
+ context 'connection failure mid-way through a large member sync' do
+ let(:members_count) { 400 }
+ let(:sync_pages_received) { [] }
+ # Will re-enable once https://github.com/ably/realtime/issues/91 is resolved
+ skip 'resumes the SYNC operation', em_timeout: 15 do
+ when_all(*members_count.times.map do |index|
+ presence_client_one.enter_client("client:#{index}")
+ end) do
+ channel_client_two.attach do
+ client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :sync
+ sync_pages_received << protocol_message
+ force_connection_failure client_two if sync_pages_received.count == 2
+ end
+ end
+ end
+ presence_client_two.get do |members|
+ expect(members.count).to eql(members_count)
+ expect(members.map(&:member_key).uniq.count).to eql(members_count)
+ stop_reactor
+ end
+ end
+ end
+ end