lib/submodules/ably-ruby/spec/acceptance/realtime/presence_spec.rb in ably-rest-0.7.3 vs lib/submodules/ably-ruby/spec/acceptance/realtime/presence_spec.rb in ably-rest-0.7.5

- old
+ new

@@ -1,9 +1,11 @@ # encoding: utf-8 require 'spec_helper' describe Ably::Realtime::Presence, :event_machine do + include Ably::Modules::Conversions + vary_by_protocol do let(:default_options) { { api_key: api_key, environment: environment, protocol: protocol } } let(:client_options) { default_options } let(:anonymous_client) { Ably::Realtime::Client.new(client_options) } @@ -18,25 +20,117 @@ let(:presence_client_one) { channel_client_one.presence } let(:channel_client_two) { client_two.channel(channel_name) } let(:presence_client_two) { channel_client_two.presence } let(:data_payload) { random_str } + def force_connection_failure(client) + # Prevent any further SYNC messages coming in on this connection + client.connection.transport.send(:driver).remove_all_listeners('message') + client.connection.transport.unbind + end + + shared_examples_for 'a public presence method' do |method_name, expected_state, args, options = {}| + def setup_test(method_name, args, options) + if options[:enter_first] + presence_client_one.public_send(method_name.to_s.gsub(/leave|update/, 'enter'), args) do + yield + end + else + yield + end + end + + unless expected_state == :left + %w(detached failed).each do |state| + it "raise an exception if the channel is #{state}" do + setup_test(method_name, args, options) do + channel_client_one.attach do + channel_client_one.change_state state.to_sym + expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::IncompatibleStateForOperation, /Operation is not allowed when channel is in STATE.#{state}/i + stop_reactor + end + end + end + end + end + + it 'returns a SafeDeferrable that catches exceptions in callbacks and logs them' do + setup_test(method_name, args, options) do + expect(presence_client_one.public_send(method_name, args)).to be_a(Ably::Util::SafeDeferrable) + stop_reactor + end + end + + it 'calls the Deferrable callback on success' do + setup_test(method_name, args, options) do + presence_client_one.public_send(method_name, args).callback do |presence| + expect(presence).to eql(presence_client_one) + expect(presence_client_one.state).to eq(expected_state) if expected_state + stop_reactor + end + end + end + + it 'catches exceptions in the provided method block and logs them to the logger' do + setup_test(method_name, args, options) do + expect(presence_client_one.logger).to receive(:error).with(/Intentional exception/) do + stop_reactor + end + presence_client_one.public_send(method_name, args) { raise 'Intentional exception' } + end + end + + context 'if connection fails before success' do + before do + # Reconfigure client library so that it makes no retry attempts and fails immediately + stub_const 'Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG', + Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG.merge( + disconnected: { retry_every: 0.1, max_time_in_state: 0 }, + suspended: { retry_every: 0.1, max_time_in_state: 0 } + ) + end + + let(:client_options) { default_options.merge(log_level: :none) } + + it 'calls the Deferrable errback if channel is detached' do + setup_test(method_name, args, options) do + channel_client_one.attach do + client_one.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + # Don't allow any messages to reach the server + client_one.connection.__outgoing_protocol_msgbus__.unsubscribe + force_connection_failure client_one + end + + presence_client_one.public_send(method_name, args).tap do |deferrable| + deferrable.callback { raise 'Should not succeed' } + deferrable.errback do |presence, error| + expect(presence).to be_a(Ably::Realtime::Presence) + expect(error).to be_kind_of(Ably::Exceptions::BaseAblyException) + stop_reactor + end + end + end + end + end + end + end + context 'when attached (but not present) on a presence channel with an anonymous client (no client ID)' do it 'maintains state as other clients enter and leave the channel' do channel_anonymous_client.attach do presence_anonymous_client.subscribe(:enter) do |presence_message| expect(presence_message.client_id).to eql(client_one.client_id) presence_anonymous_client.get do |members| expect(members.first.client_id).to eql(client_one.client_id) expect(members.first.action).to eq(:enter) - presence_anonymous_client.subscribe(:leave) do |presence_message| - expect(presence_message.client_id).to eql(client_one.client_id) + presence_anonymous_client.subscribe(:leave) do |leave_presence_message| + expect(leave_presence_message.client_id).to eql(client_one.client_id) - presence_anonymous_client.get do |members| - expect(members.count).to eql(0) + presence_anonymous_client.get do |members_once_left| + expect(members_once_left.count).to eql(0) stop_reactor end end end end @@ -46,10 +140,53 @@ presence_client_one.leave end end end + context '#members map', api_private: true do + it 'is available once the channel is created' do + expect(presence_anonymous_client.members).to_not be_nil + stop_reactor + end + + it 'is not synchronised when initially created' do + expect(presence_anonymous_client.members).to_not be_sync_complete + stop_reactor + end + + it 'will trigger an :in_sync event when synchronisation is complete' do + presence_client_one.enter + presence_client_two.enter + + presence_anonymous_client.members.once(:in_sync) do + stop_reactor + end + end + + context 'before server sync complete' do + it 'behaves like an Enumerable allowing direct access to current members' do + expect(presence_anonymous_client.members.count).to eql(0) + expect(presence_anonymous_client.members.map(&:member_key)).to eql([]) + stop_reactor + end + end + + context 'once server sync is complete' do + it 'behaves like an Enumerable allowing direct access to current members' do + when_all(presence_client_one.enter, presence_client_two.enter) do + presence_anonymous_client.members.once(:in_sync) do + expect(presence_anonymous_client.members.count).to eql(2) + member_ids = presence_anonymous_client.members.map(&:member_key) + expect(member_ids.count).to eql(2) + expect(member_ids.uniq.count).to eql(2) + stop_reactor + end + end + end + end + end + context '#sync_complete?' do context 'when attaching to a channel without any members present' do it 'is true and the presence channel is considered synced immediately' do channel_anonymous_client.attach do expect(channel_anonymous_client.presence).to be_sync_complete @@ -71,37 +208,166 @@ end end end end - context 'when the SYNC of a presence channel spans multiple ProtocolMessage messages' do - context 'with 250 existing (present) members' do + context '250 existing (present) members on a channel (3 SYNC pages)' do + context 'requires at least 3 SYNC ProtocolMessages' do let(:enter_expected_count) { 250 } let(:present) { [] } let(:entered) { [] } + let(:sync_pages_received) { [] } - context 'when a new client attaches to the presence channel', em_timeout: 10 do + def setup_members_on(presence) + enter_expected_count.times do |index| + presence.enter_client("client:#{index}") do |message| + entered << message + next unless entered.count == enter_expected_count + yield + end + end + end + + context 'when a client attaches to the presence channel', em_timeout: 10 do it 'emits :present for each member' do - enter_expected_count.times do |index| - presence_client_one.enter_client("client:#{index}") do |message| - entered << message - next unless entered.count == enter_expected_count + setup_members_on(presence_client_one) do + presence_anonymous_client.subscribe(:present) do |present_message| + expect(present_message.action).to eq(:present) + present << present_message + next unless present.count == enter_expected_count + expect(present.map(&:client_id).uniq.count).to eql(enter_expected_count) + stop_reactor + end + end + end + + context 'and a member leaves before the SYNC operation is complete' do + it 'emits :leave immediately as the member leaves' do + all_client_ids = enter_expected_count.times.map { |id| "client:#{id}" } + + setup_members_on(presence_client_one) do + leave_member = nil + presence_anonymous_client.subscribe(:present) do |present_message| - expect(present_message.action).to eq(:present) present << present_message - next unless present.count == enter_expected_count + all_client_ids.delete(present_message.client_id) + end - expect(present.map(&:client_id).uniq.count).to eql(enter_expected_count) + presence_anonymous_client.subscribe(:leave) do |leave_message| + expect(leave_message.client_id).to eql(leave_member.client_id) + expect(present.count).to be < enter_expected_count stop_reactor end + + anonymous_client.connect do + anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :sync + sync_pages_received << protocol_message + if sync_pages_received.count == 1 + leave_action = Ably::Models::PresenceMessage::ACTION.Leave + leave_member = Ably::Models::PresenceMessage.new( + 'id' => "#{client_one.connection.id}-#{all_client_ids.first}:0", + 'clientId' => all_client_ids.first, + 'connectionId' => client_one.connection.id, + 'timestamp' => as_since_epoch(Time.now), + 'action' => leave_action + ) + presence_anonymous_client.__incoming_msgbus__.publish :presence, leave_member + end + end + end + end end end + + it 'ignores presence events with timestamps prior to the current :present event in the MembersMap' do + started_at = Time.now + + setup_members_on(presence_client_one) do + leave_member = nil + + presence_anonymous_client.subscribe(:present) do |present_message| + present << present_message + leave_member = present_message unless leave_member + + if present.count == enter_expected_count + presence_anonymous_client.get do |members| + expect(members.find { |member| member.client_id == leave_member.client_id}.action).to eq(:present) + stop_reactor + end + end + end + + presence_anonymous_client.subscribe(:leave) do |leave_message| + raise 'Leave event should not have been fired because it is out of date' + end + + anonymous_client.connect do + anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :sync + sync_pages_received << protocol_message + if sync_pages_received.count == 1 + leave_action = Ably::Models::PresenceMessage::ACTION.Leave + leave_member = Ably::Models::PresenceMessage.new( + leave_member.as_json.merge('action' => leave_action, 'timestamp' => as_since_epoch(started_at)) + ) + presence_anonymous_client.__incoming_msgbus__.publish :presence, leave_member + end + end + end + end + end + end + + it 'does not emit :present after the :leave event has been emitted, and that member is not included in the list of members via #get' do + left_client = 10 + left_client_id = "client:#{left_client}" + + setup_members_on(presence_client_one) do + member_left_emitted = false + + presence_anonymous_client.subscribe(:present) do |present_message| + if present_message.client_id == left_client_id + raise "Member #{present_message.client_id} should not have been emitted as present" + end + present << present_message.client_id + end + + presence_anonymous_client.subscribe(:leave) do |leave_message| + if present.include?(leave_message.client_id) + raise "Member #{leave_message.client_id} should not have been emitted as present previously" + end + expect(leave_message.client_id).to eql(left_client_id) + member_left_emitted = true + end + + presence_anonymous_client.get do |members| + expect(members.count).to eql(enter_expected_count - 1) + expect(member_left_emitted).to eql(true) + expect(members.map(&:client_id)).to_not include(left_client_id) + stop_reactor + end + + channel_anonymous_client.attach do + leave_action = Ably::Models::PresenceMessage::ACTION.Leave + fake_leave_presence_message = Ably::Models::PresenceMessage.new( + 'id' => "#{client_one.connection.id}-#{left_client_id}:0", + 'clientId' => left_client_id, + 'connectionId' => client_one.connection.id, + 'timestamp' => as_since_epoch(Time.now), + 'action' => leave_action + ) + # Push out a LEAVE event directly to the Presence object before it's received the :present action via the SYNC ProtocolMessage + presence_anonymous_client.__incoming_msgbus__.publish :presence, fake_leave_presence_message + end + end + end end context '#get' do - it '#waits until sync is complete', event_machine: 15 do + it 'waits until sync is complete', event_machine: 15 do enter_expected_count.times do |index| presence_client_one.enter_client("client:#{index}") do |message| entered << message next unless entered.count == enter_expected_count @@ -117,11 +383,11 @@ end end end context 'automatic attachment of channel on access to presence object' do - it 'is implicit if presence state is initalized' do + it 'is implicit if presence state is initialized' do channel_client_one.presence channel_client_one.on(:attached) do expect(channel_client_one.state).to eq(:attached) stop_reactor end @@ -199,27 +465,44 @@ end end end end + context 'message #connection_id' do + it 'matches the current client connection_id' do + channel_client_two.attach do + presence_client_one.enter + end + + presence_client_two.subscribe do |presence| + expect(presence.connection_id).to eq(client_one.connection.id) + stop_reactor + end + end + end + it 'raises an exception if client_id is not set' do expect { channel_anonymous_client.presence.enter }.to raise_error(Ably::Exceptions::Standard, /without a client_id/) stop_reactor end - it 'returns a Deferrable' do - expect(presence_client_one.enter).to be_a(EventMachine::Deferrable) - stop_reactor - end + context 'without necessary capabilities to join presence' do + let(:restricted_client) do + Ably::Realtime::Client.new(default_options.merge(api_key: restricted_api_key, log_level: :fatal)) + end + let(:restricted_channel) { restricted_client.channel("cansubscribe:channel") } + let(:restricted_presence) { restricted_channel.presence } - it 'calls the Deferrable callback on success' do - presence_client_one.enter.callback do |presence| - expect(presence).to eql(presence_client_one) - expect(presence_client_one.state).to eq(:entered) - stop_reactor + it 'calls the Deferrable errback on capabilities failure' do + restricted_presence.enter(client_id: 'clientId').tap do |deferrable| + deferrable.callback { raise "Should not succeed" } + deferrable.errback { stop_reactor } + end end end + + it_should_behave_like 'a public presence method', :enter, :entered, {} end context '#update' do it 'without previous #enter automatically enters' do presence_client_one.update(data: data_payload) do @@ -264,26 +547,11 @@ expect(message.data).to be_nil stop_reactor end end - it 'returns a Deferrable' do - presence_client_one.enter do - expect(presence_client_one.update).to be_a(EventMachine::Deferrable) - stop_reactor - end - end - - it 'calls the Deferrable callback on success' do - presence_client_one.enter do - presence_client_one.update.callback do |presence| - expect(presence).to eql(presence_client_one) - expect(presence_client_one.state).to eq(:entered) - stop_reactor - end - end - end + it_should_behave_like 'a public presence method', :update, :entered, {}, enter_first: true end context '#leave' do context ':data option' do let(:data) { random_str } @@ -332,26 +600,11 @@ it 'raises an exception if not entered' do expect { channel_anonymous_client.presence.leave }.to raise_error(Ably::Exceptions::Standard, /Unable to leave presence channel that is not entered/) stop_reactor end - it 'returns a Deferrable' do - presence_client_one.enter do - expect(presence_client_one.leave).to be_a(EventMachine::Deferrable) - stop_reactor - end - end - - it 'calls the Deferrable callback on success' do - presence_client_one.enter do - presence_client_one.leave.callback do |presence| - expect(presence).to eql(presence_client_one) - expect(presence_client_one.state).to eq(:left) - stop_reactor - end - end - end + it_should_behave_like 'a public presence method', :leave, :left, {}, enter_first: true end context ':left event' do it 'emits the data defined in enter' do channel_client_one.presence.enter(data: 'data') do @@ -413,20 +666,41 @@ stop_reactor end end end - it 'returns a Deferrable' do - expect(presence_client_one.enter_client('client_id')).to be_a(EventMachine::Deferrable) - stop_reactor + context 'message #connection_id' do + let(:client_id) { random_str } + + it 'matches the current client connection_id' do + channel_client_two.attach do + presence_client_one.enter_client(client_id) + end + + presence_client_two.subscribe do |presence| + expect(presence.client_id).to eq(client_id) + expect(presence.connection_id).to eq(client_one.connection.id) + stop_reactor + end + end end - it 'calls the Deferrable callback on success' do - presence_client_one.enter_client('client_id').callback do |presence| - expect(presence).to eql(presence_client_one) - stop_reactor + it_should_behave_like 'a public presence method', :enter_client, nil, 'client_id' + + context 'without necessary capabilities to enter on behalf of another client' do + let(:restricted_client) do + Ably::Realtime::Client.new(default_options.merge(api_key: restricted_api_key, log_level: :fatal)) end + let(:restricted_channel) { restricted_client.channel("cansubscribe:channel") } + let(:restricted_presence) { restricted_channel.presence } + + it 'calls the Deferrable errback on capabilities failure' do + restricted_presence.enter_client('clientId').tap do |deferrable| + deferrable.callback { raise "Should not succeed" } + deferrable.errback { stop_reactor } + end + end end end context '#update_client' do context 'multiple times on the same channel with different client_ids' do @@ -487,21 +761,11 @@ end end end end - it 'returns a Deferrable' do - expect(presence_client_one.update_client('client_id')).to be_a(EventMachine::Deferrable) - stop_reactor - end - - it 'calls the Deferrable callback on success' do - presence_client_one.update_client('client_id').callback do |presence| - expect(presence).to eql(presence_client_one) - stop_reactor - end - end + it_should_behave_like 'a public presence method', :update_client, nil, 'client_id' end context '#leave_client' do context 'leaves a channel' do context 'multiple times on the same channel with different client_ids' do @@ -590,37 +854,110 @@ end end end end - it 'returns a Deferrable' do - expect(presence_client_one.leave_client('client_id')).to be_a(EventMachine::Deferrable) - stop_reactor - end - - it 'calls the Deferrable callback on success' do - presence_client_one.leave_client('client_id').callback do |presence| - expect(presence).to eql(presence_client_one) - stop_reactor - end - end + it_should_behave_like 'a public presence method', :leave_client, nil, 'client_id' end end context '#get' do - it 'returns a Deferrable' do - expect(presence_client_one.get).to be_a(EventMachine::Deferrable) + it 'returns a SafeDeferrable that catches exceptions in callbacks and logs them' do + expect(presence_client_one.get).to be_a(Ably::Util::SafeDeferrable) stop_reactor end it 'calls the Deferrable callback on success' do presence_client_one.get.callback do |presence| expect(presence).to eq([]) stop_reactor end end + it 'catches exceptions in the provided method block' do + expect(presence_client_one.logger).to receive(:error).with(/Intentional exception/) do + stop_reactor + end + presence_client_one.get { raise 'Intentional exception' } + end + + %w(detached failed).each do |state| + it "raise an exception if the channel is #{state}" do + channel_client_one.attach do + channel_client_one.change_state state.to_sym + expect { presence_client_one.get }.to raise_error Ably::Exceptions::IncompatibleStateForOperation, /Operation is not allowed when channel is in STATE.#{state}/i + stop_reactor + end + end + end + + context 'during a sync' do + let(:pages) { 2 } + let(:members_per_page) { 100 } + let(:sync_pages_received) { [] } + let(:client_options) { default_options.merge(log_level: :none) } + + def connect_members_deferrables + (members_per_page * pages + 1).times.map do |index| + presence_client_one.enter_client("client:#{index}") + end + end + + before do + # Reconfigure client library so that it makes no retry attempts and fails immediately + stub_const 'Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG', + Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG.merge( + disconnected: { retry_every: 0.1, max_time_in_state: 0 }, + suspended: { retry_every: 0.1, max_time_in_state: 0 } + ) + end + + it 'fails if the connection fails' do + when_all(*connect_members_deferrables) do + channel_client_two.attach do + client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :sync + sync_pages_received << protocol_message + force_connection_failure client_two if sync_pages_received.count == 1 + end + end + end + + presence_client_two.get.tap do |deferrable| + deferrable.callback { raise 'Get should not succeed' } + deferrable.errback do |error| + stop_reactor + end + end + end + end + + it 'fails if the channel is detached' do + when_all(*connect_members_deferrables) do + channel_client_two.attach do + client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :sync + # prevent any more SYNC messages coming through + client_two.connection.transport.__incoming_protocol_msgbus__.unsubscribe + channel_client_two.change_state :detaching + channel_client_two.change_state :detached + end + end + end + + presence_client_two.get.tap do |deferrable| + deferrable.callback { raise 'Get should not succeed' } + deferrable.errback do |error| + stop_reactor + end + end + end + end + end + + # skip 'it fails if the connection changes to failed state' + it 'returns the current members on the channel' do presence_client_one.enter do presence_client_one.get do |members| expect(members.count).to eq(1) @@ -751,13 +1088,15 @@ expect(messages.map(&:action).map(&:to_sym)).to contain_exactly(:enter, :update, :leave) stop_reactor end - presence_client_one.enter - presence_client_one.update - presence_client_one.leave + presence_client_one.enter do + presence_client_one.update do + presence_client_one.leave + end + end end end end end @@ -990,12 +1329,33 @@ presence_client_one.leave end end end - skip 'ensure connection_id is unique and updated on ENTER' - skip 'ensure connection_id for presence member matches the messages they publish on the channel' - skip 'stop a call to get when the channel has not been entered' - skip 'stop a call to get when the channel has been entered but the list is not up to date' - skip 'presence will resume sync if connection is dropped mid-way' + context 'connection failure mid-way through a large member sync' do + let(:members_count) { 400 } + let(:sync_pages_received) { [] } + + # Will re-enable once https://github.com/ably/realtime/issues/91 is resolved + skip 'resumes the SYNC operation', em_timeout: 15 do + when_all(*members_count.times.map do |index| + presence_client_one.enter_client("client:#{index}") + end) do + channel_client_two.attach do + client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :sync + sync_pages_received << protocol_message + force_connection_failure client_two if sync_pages_received.count == 2 + end + end + end + + presence_client_two.get do |members| + expect(members.count).to eql(members_count) + expect(members.map(&:member_key).uniq.count).to eql(members_count) + stop_reactor + end + end + end + end end end