# encoding: utf-8 require 'spec_helper' describe Ably::Realtime::Presence, :event_machine do include Ably::Modules::Conversions vary_by_protocol do let(:default_options) { { key: api_key, environment: environment, protocol: protocol } } let(:client_options) { default_options } let(:anonymous_client) { auto_close Ably::Realtime::Client.new(client_options) } let(:client_one_id) { random_str } let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(client_id: client_one_id)) } let(:client_two_id) { random_str } let(:client_two) { auto_close Ably::Realtime::Client.new(client_options.merge(client_id: client_two_id)) } let(:wildcard_token) { lambda { |token_params| Ably::Rest::Client.new(client_options).auth.request_token(client_id: '*') } } let(:channel_name) { "presence-#{random_str(4)}" } let(:channel_anonymous_client) { anonymous_client.channel(channel_name) } let(:presence_anonymous_client) { channel_anonymous_client.presence } let(:channel_client_one) { client_one.channel(channel_name) } let(:channel_rest_client_one) { client_one.rest_client.channel(channel_name) } let(:presence_client_one) { channel_client_one.presence } let(:channel_client_two) { client_two.channel(channel_name) } let(:presence_client_two) { channel_client_two.presence } let(:data_payload) { random_str } def force_connection_failure(client) # Prevent any further SYNC messages coming in on this connection client.connection.transport.send(:driver).remove_all_listeners('message') client.connection.transport.unbind end shared_examples_for 'a public presence method' do |method_name, expected_state, args, options = {}| let(:client_id) do if args.empty? random_str else args end end def setup_test(method_name, args, options) if options[:enter_first] acked = false received = false presence_client_one.public_send(method_name.to_s.gsub(/leave|update/, 'enter'), args) do acked = true yield if acked & received end presence_client_one.subscribe do |presence_message| expect(presence_message.action).to eq(:enter) presence_client_one.unsubscribe received = true yield if acked & received end else yield end end unless expected_state == :left it 'raise an exception if the channel is detached' do setup_test(method_name, args, options) do channel_client_one.attach do channel_client_one.transition_state_machine :detaching channel_client_one.once(:detached) do presence_client_one.public_send(method_name, args).tap do |deferrable| deferrable.callback { raise 'Get should not succeed' } deferrable.errback do |error| expect(error).to be_a(Ably::Exceptions::InvalidState) expect(error.message).to match(/Operation is not allowed when channel is in STATE.Detached/) stop_reactor end end end end end end it 'raise an exception if the channel becomes detached' do setup_test(method_name, args, options) do channel_client_one.attach do channel_client_one.transition_state_machine :detaching presence_client_one.public_send(method_name, args).tap do |deferrable| deferrable.callback { raise 'Get should not succeed' } deferrable.errback do |error| expect(error).to be_a(Ably::Exceptions::InvalidState) expect(error.message).to match(/Operation failed as channel transitioned to STATE.Detached/) stop_reactor end end end end end it 'raise an exception if the channel is failed' do setup_test(method_name, args, options) do channel_client_one.attach do channel_client_one.transition_state_machine :failed expect(channel_client_one.state).to eq(:failed) presence_client_one.public_send(method_name, args).tap do |deferrable| deferrable.callback { raise 'Get should not succeed' } deferrable.errback do |error| expect(error).to be_a(Ably::Exceptions::InvalidState) expect(error.message).to match(/Operation is not allowed when channel is in STATE.Failed/) stop_reactor end end end end end it 'raise an exception if the channel becomes failed' do setup_test(method_name, args, options) do channel_client_one.attach do presence_client_one.public_send(method_name, args).tap do |deferrable| deferrable.callback { raise 'Get should not succeed' } deferrable.errback do |error| expect(error).to be_a(Ably::Exceptions::MessageDeliveryFailed) stop_reactor end end channel_client_one.transition_state_machine :failed expect(channel_client_one.state).to eq(:failed) end end end it 'implicitly attaches the channel' do expect(channel_client_one).to_not be_attached presence_client_one.public_send(method_name, args) do expect(channel_client_one).to be_attached stop_reactor end end context 'when :queue_messages client option is false' do let(:client_one) { auto_close Ably::Realtime::Client.new(default_options.merge(queue_messages: false, client_id: client_id)) } context 'and connection state initialized' do it 'fails the deferrable' do presence_client_one.public_send(method_name, args).errback do |error| expect(error).to be_a(Ably::Exceptions::MessageQueueingDisabled) stop_reactor end expect(client_one.connection).to be_initialized end end context 'and connection state connecting' do it 'fails the deferrable' do client_one.connect EventMachine.next_tick do presence_client_one.public_send(method_name, args).errback do |error| expect(error).to be_a(Ably::Exceptions::MessageQueueingDisabled) stop_reactor end expect(client_one.connection).to be_connecting end end end context 'and connection state disconnected' do let(:client_one) { auto_close Ably::Realtime::Client.new(default_options.merge(queue_messages: false, client_id: client_id, :log_level => :error)) } it 'fails the deferrable' do client_one.connection.once(:connected) do client_one.connection.once(:disconnected) do presence_client_one.public_send(method_name, args).errback do |error| expect(error).to be_a(Ably::Exceptions::MessageQueueingDisabled) stop_reactor end expect(client_one.connection).to be_disconnected end client_one.connection.transition_state_machine :disconnected end end end context 'and connection state connected' do it 'publishes the message' do client_one.connection.once(:connected) do presence_client_one.public_send(method_name, args) stop_reactor end end end end end context 'with supported data payload content type' do def register_presence_and_check_data(method_name, data) if method_name.to_s.match(/_client/) presence_client_one.public_send(method_name, client_id, data) else presence_client_one.public_send(method_name, data) end presence_client_one.subscribe do |presence_message| expect(presence_message.data).to eql(data) stop_reactor end end context 'JSON Object (Hash)' do let(:data) { { 'Hash' => 'true' } } it 'is encoded and decoded to the same hash' do setup_test(method_name, args, options) do register_presence_and_check_data method_name, data end end end context 'JSON Array' do let(:data) { [ nil, true, false, 55, 'string', { 'Hash' => true }, ['array'] ] } it 'is encoded and decoded to the same Array' do setup_test(method_name, args, options) do register_presence_and_check_data method_name, data end end end context 'String' do let(:data) { random_str } it 'is encoded and decoded to the same Array' do setup_test(method_name, args, options) do register_presence_and_check_data method_name, data end end end context 'Binary' do let(:data) { Base64.encode64(random_str) } it 'is encoded and decoded to the same Array' do setup_test(method_name, args, options) do register_presence_and_check_data method_name, data end end end end context 'with unsupported data payload content type' do def presence_action(method_name, data) if method_name.to_s.match(/_client/) presence_client_one.public_send(method_name, client_id, data) else presence_client_one.public_send(method_name, data) end end context 'Integer' do let(:data) { 1 } it 'raises an UnsupportedDataType 40013 exception' do expect { presence_action(method_name, data) }.to raise_error(Ably::Exceptions::UnsupportedDataType) stop_reactor end end context 'Float' do let(:data) { 1.1 } it 'raises an UnsupportedDataType 40013 exception' do expect { presence_action(method_name, data) }.to raise_error(Ably::Exceptions::UnsupportedDataType) stop_reactor end end context 'Boolean' do let(:data) { true } it 'raises an UnsupportedDataType 40013 exception' do expect { presence_action(method_name, data) }.to raise_error(Ably::Exceptions::UnsupportedDataType) stop_reactor end end context 'False' do let(:data) { false } it 'raises an UnsupportedDataType 40013 exception' do expect { presence_action(method_name, data) }.to raise_error(Ably::Exceptions::UnsupportedDataType) stop_reactor end end end it 'returns a SafeDeferrable that catches exceptions in callbacks and logs them' do setup_test(method_name, args, options) do expect(presence_client_one.public_send(method_name, args)).to be_a(Ably::Util::SafeDeferrable) stop_reactor end end it 'allows a block to be passed in that is executed upon success' do setup_test(method_name, args, options) do presence_client_one.public_send(method_name, args) do stop_reactor end end end it 'calls the Deferrable callback on success' do setup_test(method_name, args, options) do presence_client_one.public_send(method_name, args).callback do |presence| expect(presence).to eql(presence_client_one) expect(presence_client_one.state).to eq(expected_state) if expected_state stop_reactor end end end it 'catches exceptions in the provided method block and logs them to the logger' do setup_test(method_name, args, options) do expect(presence_client_one.logger).to receive(:error) do |*args, &block| expect(args.concat([block ? block.call : nil]).join(',')).to match(/Intentional exception/) stop_reactor end presence_client_one.public_send(method_name, args) { raise 'Intentional exception' } end end context 'if connection fails before success' do let(:client_options) { default_options.merge(log_level: :none) } it 'calls the Deferrable errback if channel is detached' do setup_test(method_name, args, options) do channel_client_one.attach do client_one.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| # Don't allow any messages to reach the server client_one.connection.__outgoing_protocol_msgbus__.unsubscribe error_message = Ably::Models::ProtocolMessage.new(action: 9, error: { message: 'force failure' }) client_one.connection.__incoming_protocol_msgbus__.publish :protocol_message, error_message end presence_client_one.public_send(method_name, args).tap do |deferrable| deferrable.callback { raise 'Should not succeed' } deferrable.errback do |error| expect(error).to be_kind_of(Ably::Exceptions::BaseAblyException) stop_reactor end end end end end end end shared_examples_for 'a presence on behalf of another client method' do |method_name| context ":#{method_name} when authenticated with a wildcard client_id" do let(:token) { Ably::Rest::Client.new(default_options).auth.request_token(client_id: '*').token } let(:client_options) { default_options.merge(key: nil, token: token) } let(:client) { auto_close Ably::Realtime::Client.new(client_options) } let(:presence_channel) { client.channels.get(channel_name).presence } context 'and a valid client_id' do it 'succeeds' do presence_channel.public_send(method_name, 'clientId') do EM.add_timer(0.5) { stop_reactor } end.tap do |deferrable| deferrable.errback { raise 'Should have succeeded' } end end end context 'and a wildcard client_id' do it 'throws an exception' do expect { presence_channel.public_send(method_name, '*') }.to raise_error Ably::Exceptions::IncompatibleClientId stop_reactor end end context 'and an empty client_id' do it 'throws an exception' do expect { presence_channel.public_send(method_name, nil) }.to raise_error Ably::Exceptions::IncompatibleClientId stop_reactor end end context 'and a client_id that is not a string type' do it 'throws an exception' do expect { presence_channel.public_send(method_name, 1) }.to raise_error Ably::Exceptions::IncompatibleClientId stop_reactor end end end context ":#{method_name} when authenticated with a valid client_id" do let(:token) { Ably::Rest::Client.new(default_options).auth.request_token(client_id: 'valid').token } let(:client_options) { default_options.merge(key: nil, token: token) } let(:client) { auto_close Ably::Realtime::Client.new(client_options.merge(log_level: :error)) } let(:channel) { client.channels.get(channel_name) } let(:presence_channel) { channel.presence } context 'and another invalid client_id' do context 'before authentication' do it 'allows the operation and then Ably rejects the operation' do presence_channel.public_send(method_name, 'invalid').errback do |error| expect(error.code).to eql(40012) stop_reactor end end end context 'after authentication' do it 'throws an exception' do channel.attach do expect { presence_channel.public_send(method_name, 'invalid') }.to raise_error Ably::Exceptions::IncompatibleClientId stop_reactor end end end end context 'and a wildcard client_id' do it 'throws an exception' do expect { presence_channel.public_send(method_name, '*') }.to raise_error Ably::Exceptions::IncompatibleClientId stop_reactor end end context 'and an empty client_id' do it 'throws an exception' do expect { presence_channel.public_send(method_name, nil) }.to raise_error Ably::Exceptions::IncompatibleClientId stop_reactor end end end context ":#{method_name} when anonymous and no client_id" do let(:token) { Ably::Rest::Client.new(default_options).auth.request_token(client_id: nil).token } let(:client_options) { default_options.merge(key: nil, token: token) } let(:client) { auto_close Ably::Realtime::Client.new(client_options.merge(log_level: :error)) } let(:channel) { client.channels.get(channel_name) } let(:presence_channel) { channel.presence } context 'and another invalid client_id' do context 'before authentication' do it 'allows the operation and then Ably rejects the operation' do presence_channel.public_send(method_name, 'invalid').errback do |error| expect(error.code).to eql(40012) stop_reactor end end end context 'after authentication' do it 'throws an exception' do channel.attach do expect { presence_channel.public_send(method_name, 'invalid') }.to raise_error Ably::Exceptions::IncompatibleClientId stop_reactor end end end end context 'and a wildcard client_id' do it 'throws an exception' do expect { presence_channel.public_send(method_name, '*') }.to raise_error Ably::Exceptions::IncompatibleClientId stop_reactor end end context 'and an empty client_id' do it 'throws an exception' do expect { presence_channel.public_send(method_name, nil) }.to raise_error Ably::Exceptions::IncompatibleClientId stop_reactor end end end end context 'when attached (but not present) on a presence channel with an anonymous client (no client ID)' do it 'maintains state as other clients enter and leave the channel (#RTP2e)' do channel_anonymous_client.attach do presence_anonymous_client.subscribe(:enter) do |presence_message| expect(presence_message.client_id).to eql(client_one.client_id) presence_anonymous_client.get do |members| expect(members.first.client_id).to eql(client_one.client_id) expect(members.first.action).to eq(:present) presence_anonymous_client.subscribe(:leave) do |leave_presence_message| expect(leave_presence_message.client_id).to eql(client_one.client_id) presence_anonymous_client.get do |members_once_left| expect(members_once_left.count).to eql(0) stop_reactor end end end end end presence_client_one.enter do EventMachine.add_timer(0.5) do presence_client_one.leave end end end end context '#members map / PresenceMap (#RTP2)', api_private: true do it 'is available once the channel is created' do expect(presence_anonymous_client.members).to_not be_nil stop_reactor end it 'is not synchronised when initially created' do expect(presence_anonymous_client.members).to_not be_sync_complete stop_reactor end it 'will emit an :in_sync event when synchronisation is complete' do presence_client_one.enter presence_client_two.enter presence_anonymous_client.members.once(:in_sync) do stop_reactor end channel_anonymous_client.attach end context 'before server sync complete' do it 'behaves like an Enumerable allowing direct access to current members' do expect(presence_anonymous_client.members.count).to eql(0) expect(presence_anonymous_client.members.map(&:member_key)).to eql([]) stop_reactor end end context 'once server sync is complete' do it 'behaves like an Enumerable allowing direct access to current members' do presence_client_one.enter presence_client_two.enter entered = 0 presence_client_one.subscribe(:enter) do entered += 1 next unless entered == 2 presence_anonymous_client.members.once(:in_sync) do expect(presence_anonymous_client.members.count).to eql(2) member_ids = presence_anonymous_client.members.map(&:member_key) expect(member_ids.count).to eql(2) expect(member_ids.uniq.count).to eql(2) stop_reactor end channel_anonymous_client.attach end end end context 'the map is based on the member_key (connection_id & client_id)' do # 2 unqiue client_id with client_id "b" being on two connections let(:enter_action) { 2 } let(:presence_data) do [ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action }, { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: enter_action }, { client_id: 'a', connection_id: 'one', id: 'one:0:2', action: enter_action }, { client_id: 'b', connection_id: 'one', id: 'one:0:3', action: enter_action }, { client_id: 'b', connection_id: 'two', id: 'two:0:4', action: enter_action } ] end it 'ensures uniqueness from this member_key (#RTP2a)' do channel_anonymous_client.attach do presence_anonymous_client.get do |members| expect(members.length).to eql(0) ## Fabricate members action = Ably::Models::ProtocolMessage::ACTION.Presence presence_msg = Ably::Models::ProtocolMessage.new( action: action, connection_serial: 20, channel: channel_name, presence: presence_data, timestamp: Time.now.to_i * 1000 ) anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, presence_msg EventMachine.add_timer(0.5) do presence_anonymous_client.get do |members| expect(members.length).to eql(3) expect(members.map { |member| member.client_id }.uniq).to contain_exactly('a', 'b') stop_reactor end end end end end end context 'newness is compared based on PresenceMessage#id unless synthesized' do let(:page_size) { 100 } let(:enter_expected_count) { page_size + 1 } # 100 per page, this ensures we have more than one page so that we can test newness during sync let(:enter_action) { 2 } let(:leave_action) { 3 } let(:now) { Time.now.to_i * 1000 } let(:entered) { [] } let(:client_one) { auto_close Ably::Realtime::Client.new(default_options.merge(auth_callback: wildcard_token)) } def setup_members_on(presence) enter_expected_count.times do |indx| # 10 messages per second max rate on simulation accounts rate = indx.to_f / 10 EventMachine.add_timer(rate) do presence.enter_client("client:#{indx}") do |message| entered << message next unless entered.count == enter_expected_count yield end end end end def allow_sync_fabricate_data_final_sync_and_assert_members setup_members_on(presence_client_one) do sync_pages_received = [] anonymous_client.connection.once(:connected) do anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :sync sync_pages_received << protocol_message if sync_pages_received.count == 1 action = Ably::Models::ProtocolMessage::ACTION.Presence presence_msg = Ably::Models::ProtocolMessage.new( action: action, connection_serial: anonymous_client.connection.serial + 1, channel: channel_name, presence: presence_data, timestamp: Time.now.to_i * 1000 ) anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, presence_msg # Now simulate an end to the sync action = Ably::Models::ProtocolMessage::ACTION.Sync sync_msg = Ably::Models::ProtocolMessage.new( action: action, connection_serial: anonymous_client.connection.serial + 2, channel: channel_name, channel_serial: 'validserialprefix:', # with no part after the `:` this indicates the end to the SYNC presence: [], timestamp: Time.now.to_i * 1000 ) anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_msg # Stop the next SYNC arriving anonymous_client.connection.__incoming_protocol_msgbus__.unsubscribe end end end presence_anonymous_client.get do |members| expect(members.length).to eql(page_size + 2) expect(members.find { |member| member.client_id == 'a' }).to be_nil expect(members.find { |member| member.client_id == 'b' }.timestamp.to_i).to eql(now / 1000) expect(members.find { |member| member.client_id == 'c' }.timestamp.to_i).to eql(now / 1000) stop_reactor end end end end context 'when presence messages are synthesized' do let(:presence_data) do [ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action, timestamp: now }, # first messages from client, second fabricated { client_id: 'a', connection_id: 'one', id: 'fabricated:0:1', action: leave_action, timestamp: now + 1 }, # leave after enter based on timestamp { client_id: 'b', connection_id: 'one', id: 'one:0:2', action: enter_action, timestamp: now }, # first messages from client, second fabricated { client_id: 'b', connection_id: 'one', id: 'fabricated:0:3', action: leave_action, timestamp: now - 1 }, # leave before enter based on timestamp { client_id: 'c', connection_id: 'one', id: 'fabricated:0:4', action: enter_action, timestamp: now }, # both messages fabricated { client_id: 'c', connection_id: 'one', id: 'fabricated:0:5', action: leave_action, timestamp: now - 1 } # leave before enter based on timestamp ] end it 'compares based on timestamp if either has a connectionId not part of the presence message id (#RTP2b1)' do allow_sync_fabricate_data_final_sync_and_assert_members end end context 'when presence messages are not synthesized (events sent from clients)' do let(:presence_data) do [ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action, timestamp: now }, # first messages from client { client_id: 'a', connection_id: 'one', id: 'one:1:0', action: leave_action, timestamp: now - 1 }, # leave after enter based on msgSerial part of ID { client_id: 'b', connection_id: 'one', id: 'one:2:2', action: enter_action, timestamp: now }, # first messages from client { client_id: 'b', connection_id: 'one', id: 'one:2:1', action: leave_action, timestamp: now + 1 }, # leave before enter based on index part of ID { client_id: 'c', connection_id: 'one', id: 'one:4:4', action: enter_action, timestamp: now }, # first messages from client { client_id: 'c', connection_id: 'one', id: 'one:3:5', action: leave_action, timestamp: now + 1 } # leave before enter based on msgSerial part of ID ] end it 'compares based on timestamp if either has a connectionId not part of the presence message id (#RTP2b2)' do allow_sync_fabricate_data_final_sync_and_assert_members end end end end context '#sync_complete? and SYNC flags (#RTP1)' do context 'when attaching to a channel without any members present' do xit 'sync_complete? is true, there is no presence flag, and the presence channel is considered synced immediately (#RTP1)' do flag_checked = false anonymous_client.connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :attached flag_checked = true expect(protocol_message.has_presence_flag?).to eql(false) end end channel_anonymous_client.attach do expect(channel_anonymous_client.presence).to be_sync_complete EventMachine.next_tick do expect(flag_checked).to eql(true) stop_reactor end end end end context 'when attaching to a channel with members present' do it 'sync_complete? is false, there is a presence flag, and the presence channel is subsequently synced (#RTP1)' do flag_checked = false anonymous_client.connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :attached flag_checked = true expect(protocol_message.has_presence_flag?).to eql(true) end end presence_client_one.enter presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter channel_anonymous_client.attach do expect(channel_anonymous_client.presence).to_not be_sync_complete channel_anonymous_client.presence.get do expect(channel_anonymous_client.presence).to be_sync_complete EventMachine.next_tick do expect(flag_checked).to eql(true) stop_reactor end end end end end end end context '101 existing (present) members on a channel (2 SYNC pages)' do context 'requiring at least 2 SYNC ProtocolMessages', em_timeout: 40 do let(:enter_expected_count) { 101 } let(:present) { [] } let(:entered) { [] } let(:sync_pages_received) { [] } let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } def setup_members_on(presence) enter_expected_count.times do |indx| # 10 messages per second max rate on simulation accounts rate = indx.to_f / 10 EventMachine.add_timer(rate) do presence.enter_client("client:#{indx}") do |message| entered << message next unless entered.count == enter_expected_count yield end end end end context 'when a client attaches to the presence channel' do it 'emits :present for each member' do setup_members_on(presence_client_one) do presence_anonymous_client.subscribe(:present) do |present_message| expect(present_message.action).to eq(:present) present << present_message next unless present.count == enter_expected_count expect(present.map(&:client_id).uniq.count).to eql(enter_expected_count) stop_reactor end end end context 'and a member enters before the SYNC operation is complete' do let(:enter_client_id) { random_str } it 'emits a :enter immediately and the member is :present once the sync is complete (#RTP2g)' do setup_members_on(presence_client_one) do member_entered = false anonymous_client.connect do presence_anonymous_client.subscribe(:enter) do |member| expect(member.client_id).to eql(enter_client_id) member_entered = true end presence_anonymous_client.get do |members| expect(members.find { |member| member.client_id == enter_client_id }.action).to eq(:present) stop_reactor end anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :sync sync_pages_received << protocol_message if sync_pages_received.count == 1 enter_action = Ably::Models::PresenceMessage::ACTION.Enter enter_member = Ably::Models::PresenceMessage.new( 'id' => "#{client_one.connection.id}:#{random_str}:0", 'clientId' => enter_client_id, 'connectionId' => client_one.connection.id, 'timestamp' => as_since_epoch(Time.now), 'action' => enter_action ) presence_anonymous_client.__incoming_msgbus__.publish :presence, enter_member end end end end end end end context 'and a member leaves before the SYNC operation is complete' do it 'emits :leave immediately as the member leaves and cleans up the ABSENT member after (#RTP2f, #RTP2g)' do all_client_ids = enter_expected_count.times.map { |id| "client:#{id}" } setup_members_on(presence_client_one) do leave_member = nil presence_anonymous_client.subscribe(:present) do |present_message| present << present_message all_client_ids.delete(present_message.client_id) end presence_anonymous_client.subscribe(:leave) do |leave_message| expect(leave_message.client_id).to eql(leave_member.client_id) expect(present.count).to be < enter_expected_count # Hacky accessing a private method, but absent members are intentionally not exposed to any public APIs expect(presence_anonymous_client.members.send(:absent_members).length).to eql(1) presence_anonymous_client.members.once(:in_sync) do # Check that members count is exact indicating the members with LEAVE action after sync are removed expect(presence_anonymous_client).to be_sync_complete expect(presence_anonymous_client.members.length).to eql(enter_expected_count - 1) presence_anonymous_client.unsubscribe stop_reactor end end anonymous_client.connect do anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :sync sync_pages_received << protocol_message if sync_pages_received.count == 1 leave_action = Ably::Models::PresenceMessage::ACTION.Leave leave_member = Ably::Models::PresenceMessage.new( 'id' => "#{client_one.connection.id}:#{all_client_ids.first}:0", 'clientId' => all_client_ids.first, 'connectionId' => client_one.connection.id, 'timestamp' => as_since_epoch(Time.now), 'action' => leave_action ) presence_anonymous_client.__incoming_msgbus__.publish :presence, leave_member end end end end end end it 'ignores presence events with timestamps / identifiers prior to the current :present event in the MembersMap (#RTP2c)' do started_at = Time.now setup_members_on(presence_client_one) do leave_member = nil presence_anonymous_client.subscribe(:present) do |present_message| present << present_message if present.count == enter_expected_count presence_anonymous_client.get do |members| member = members.find { |member| member.client_id == leave_member.client_id} expect(member).to_not be_nil expect(member.action).to eq(:present) EventMachine.add_timer(1) do presence_anonymous_client.unsubscribe stop_reactor end end end end presence_anonymous_client.subscribe(:leave) do |leave_message| raise "Leave event for #{leave_message} should not have been fired because it is out of date" end anonymous_client.connect do anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :sync sync_pages_received << protocol_message if sync_pages_received.count == 1 first_member = protocol_message.presence[0] # get the first member in the SYNC set leave_action = Ably::Models::PresenceMessage::ACTION.Leave leave_member = Ably::Models::PresenceMessage.new( first_member.as_json.merge('action' => leave_action, 'timestamp' => as_since_epoch(started_at)) ) # After the SYNC has started, no inject that member has having left with a timestamp before the sync presence_anonymous_client.__incoming_msgbus__.publish :presence, leave_member end end end end end end it 'does not emit :present after the :leave event has been emitted, and that member is not included in the list of members via #get (#RTP2f)' do left_client = 10 left_client_id = "client:#{left_client}" setup_members_on(presence_client_one) do member_left_emitted = false presence_anonymous_client.subscribe(:present) do |present_message| if present_message.client_id == left_client_id raise "Member #{present_message.client_id} should not have been emitted as present" end present << present_message.client_id end presence_anonymous_client.subscribe(:leave) do |leave_message| if present.include?(leave_message.client_id) raise "Member #{leave_message.client_id} should not have been emitted as present previously" end expect(leave_message.client_id).to eql(left_client_id) member_left_emitted = true end presence_anonymous_client.get do |members| expect(members.count).to eql(enter_expected_count - 1) expect(member_left_emitted).to eql(true) expect(members.map(&:client_id)).to_not include(left_client_id) EventMachine.add_timer(1) do presence_anonymous_client.unsubscribe stop_reactor end end channel_anonymous_client.attach do leave_action = Ably::Models::PresenceMessage::ACTION.Leave fake_leave_presence_message = Ably::Models::PresenceMessage.new( 'id' => "#{client_one.connection.id}:#{left_client_id}:0", 'clientId' => left_client_id, 'connectionId' => client_one.connection.id, 'timestamp' => as_since_epoch(Time.now), 'action' => leave_action ) # Push out a LEAVE event directly to the Presence object before it's received the :present action via the SYNC ProtocolMessage presence_anonymous_client.__incoming_msgbus__.publish :presence, fake_leave_presence_message end end end end context '#get' do context 'by default' do it 'waits until sync is complete (#RTP11c1)', em_timeout: 30 do # allow for slow connections and lots of messages enter_expected_count.times do |indx| EventMachine.add_timer(indx / 10) do presence_client_one.enter_client "client:#{indx}" end end presence_client_one.subscribe(:enter) do |message| entered << message next unless entered.count == enter_expected_count presence_anonymous_client.get do |members| expect(members.map(&:client_id).uniq.count).to eql(enter_expected_count) expect(members.count).to eql(enter_expected_count) stop_reactor end end end end context 'with :wait_for_sync option set to false (#RTP11c1)' do it 'it does not wait for sync', em_timeout: 30 do # allow for slow connections and lots of messages enter_expected_count.times do |indx| EventMachine.add_timer(indx / 10) do presence_client_one.enter_client "client:#{indx}" presence_client_one.subscribe(:enter) do |message| entered << message next unless entered.count == enter_expected_count channel_anonymous_client.attach do presence_anonymous_client.get(wait_for_sync: false) do |members| expect(presence_anonymous_client.members).to_not be_in_sync expect(members.count).to eql(0) stop_reactor end end end end end end end end end end end context 'state' do context 'once opened' do it 'once opened, enters the :left state if the channel detaches' do detached = false channel_client_one.presence.on(:left) do expect(channel_client_one.presence.state).to eq(:left) EventMachine.next_tick do expect(detached).to eq(true) stop_reactor end end channel_client_one.presence.enter do |presence| expect(presence.state).to eq(:entered) channel_client_one.detach do expect(channel_client_one.state).to eq(:detached) detached = true end end end end end context '#enter' do context 'data attribute' do context 'when provided as argument option to #enter' do it 'changes to value provided in #leave' do leave_callback_called = false presence_client_one.enter('stored') do expect(presence_client_one.data).to eql('stored') presence_client_one.leave do |presence| leave_callback_called = true end presence_client_one.on(:left) do expect(presence_client_one.data).to eql(nil) EventMachine.next_tick do expect(leave_callback_called).to eql(true) stop_reactor end end end end end end context 'message #connection_id' do it 'matches the current client connection_id' do channel_client_two.attach do presence_client_two.subscribe do |presence| expect(presence.connection_id).to eq(client_one.connection.id) stop_reactor end presence_client_one.enter end end end context 'without necessary capabilities to join presence' do let(:restricted_client) do auto_close Ably::Realtime::Client.new(default_options.merge(key: restricted_api_key, log_level: :fatal)) end let(:restricted_channel) { restricted_client.channel("cansubscribe:channel") } let(:restricted_presence) { restricted_channel.presence } it 'calls the Deferrable errback on capabilities failure' do restricted_presence.enter_client('bob').tap do |deferrable| deferrable.callback { raise "Should not succeed" } deferrable.errback { stop_reactor } end end end it_should_behave_like 'a public presence method', :enter, :entered, {} end context '#update' do it 'without previous #enter automatically enters' do presence_client_one.update(data_payload) do EventMachine.add_timer(1) do expect(presence_client_one.state).to eq(:entered) stop_reactor end end end context 'when ENTERED' do it 'has no effect on the state' do presence_client_one.enter do presence_client_one.once_state_changed { fail 'State should not have changed ' } presence_client_one.update(data_payload) do EventMachine.add_timer(1) do expect(presence_client_one.state).to eq(:entered) presence_client_one.off stop_reactor end end end end end it 'updates the data if :data argument provided' do channel_client_one.attach do presence_client_one.enter('prior') do presence_client_one.update(data_payload) end presence_client_one.subscribe(:update) do |message| expect(message.data).to eql(data_payload) stop_reactor end end end it 'updates the data to nil if :data argument is not provided (assumes nil value)' do channel_client_one.attach do presence_client_one.enter('prior') do presence_client_one.update end presence_client_one.subscribe(:update) do |message| expect(message.data).to be_nil stop_reactor end end end it_should_behave_like 'a public presence method', :update, :entered, {}, enter_first: true end context '#leave' do context ':data option' do let(:data) { random_str } let(:enter_data) { random_str } context 'when set to a string' do it 'emits the new data for the leave event' do channel_client_one.attach do presence_client_one.enter enter_data do presence_client_one.leave data end presence_client_one.subscribe(:leave) do |presence_message| expect(presence_message.data).to eql(data) stop_reactor end end end end context 'when set to nil' do it 'emits the last value for the data attribute when leaving' do channel_client_one.attach do presence_client_one.enter enter_data do presence_client_one.leave nil end presence_client_one.subscribe(:leave) do |presence_message| expect(presence_message.data).to eql(enter_data) stop_reactor end end end end context 'when not passed as an argument (i.e. nil)' do it 'emits the previous value for the data attribute when leaving' do channel_client_one.attach do presence_client_one.enter enter_data do presence_client_one.leave end presence_client_one.subscribe(:leave) do |presence_message| expect(presence_message.data).to eql(enter_data) stop_reactor end end end end context 'and sync is complete' do it 'does not cache members that have left' do enter_ack = false channel_client_one.attach do presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter EventMachine.add_timer(0.5) do expect(presence_client_one.members).to be_in_sync expect(presence_client_one.members.send(:members).count).to eql(1) presence_client_one.leave data end end presence_client_one.enter(enter_data) do enter_ack = true end presence_client_one.subscribe(:leave) do |presence_message| presence_client_one.unsubscribe :leave expect(presence_message.data).to eql(data) expect(presence_client_one.members.send(:members).count).to eql(0) expect(enter_ack).to eql(true) stop_reactor end end end end end it 'succeeds and does not emit an event (#RTP10d)' do channel_client_one.attach do channel_client_one.presence.leave do # allow enough time for leave event to (not) fire EventMachine.add_timer(2) do stop_reactor end end channel_client_one.subscribe(:leave) do raise "No leave event should fire" end end end it_should_behave_like 'a public presence method', :leave, :left, {}, enter_first: true end context ':left event' do it 'emits the data defined in enter' do channel_client_two.attach do channel_client_one.presence.enter('data') do channel_client_one.presence.leave end channel_client_two.presence.subscribe(:leave) do |message| expect(message.data).to eql('data') stop_reactor end end end it 'emits the data defined in update' do channel_client_two.attach do channel_client_one.presence.enter('something else') do channel_client_one.presence.update('data') do channel_client_one.presence.leave end end channel_client_two.presence.subscribe(:leave) do |message| expect(message.data).to eql('data') stop_reactor end end end end context 'entering/updating/leaving presence state on behalf of another client_id' do let(:client_count) { 5 } let(:clients) { [] } let(:data) { random_str } let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } let(:client_two) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } context '#enter_client' do context 'multiple times on the same channel with different client_ids' do it "has no affect on the client's presence state and only enters on behalf of the provided client_id" do client_count.times do |client_id| presence_client_one.enter_client("client:#{client_id}") do presence_client_one.on(:entered) { raise 'Should not have entered' } next unless client_id == client_count - 1 EventMachine.add_timer(1) do expect(presence_client_one.state).to eq(:initialized) stop_reactor end end end end it 'enters a channel and sets the data based on the provided :data option' do channel_anonymous_client.attach do client_count.times do |client_id| presence_client_one.enter_client("client:#{client_id}", data) end presence_anonymous_client.subscribe(:enter) do |presence| expect(presence.data).to eql(data) clients << presence next unless clients.count == 5 expect(clients.map(&:client_id).uniq.count).to eql(5) stop_reactor end end end end context 'message #connection_id' do let(:client_id) { random_str } it 'matches the current client connection_id' do channel_client_two.attach do presence_client_one.enter_client(client_id) presence_client_two.subscribe do |presence| expect(presence.client_id).to eq(client_id) expect(presence.connection_id).to eq(client_one.connection.id) stop_reactor end end end end context 'without necessary capabilities to enter on behalf of another client' do let(:restricted_client) do auto_close Ably::Realtime::Client.new(default_options.merge(key: restricted_api_key, log_level: :fatal)) end let(:restricted_channel) { restricted_client.channel("cansubscribe:channel") } let(:restricted_presence) { restricted_channel.presence } it 'calls the Deferrable errback on capabilities failure' do restricted_presence.enter_client('clientId').tap do |deferrable| deferrable.callback { raise "Should not succeed" } deferrable.errback { stop_reactor } end end end it_should_behave_like 'a public presence method', :enter_client, nil, 'client_id' it_should_behave_like 'a presence on behalf of another client method', :enter_client end context '#update_client' do context 'multiple times on the same channel with different client_ids' do it 'updates the data attribute for the member when :data option provided' do updated_callback_count = 0 channel_anonymous_client.attach do client_count.times do |client_id| presence_client_one.enter_client("client:#{client_id}") do presence_client_one.update_client("client:#{client_id}", data) do updated_callback_count += 1 end end end presence_anonymous_client.subscribe(:update) do |presence| expect(presence.data).to eql(data) clients << presence next unless clients.count == 5 wait_until(lambda { updated_callback_count == 5 }) do expect(clients.map(&:client_id).uniq.count).to eql(5) expect(updated_callback_count).to eql(5) stop_reactor end end end end it 'updates the data attribute to null for the member when :data option is not provided (assumed null)' do channel_anonymous_client.attach do presence_client_one.enter_client('client_1') do presence_client_one.update_client('client_1') end presence_anonymous_client.subscribe(:update) do |presence| expect(presence.client_id).to eql('client_1') expect(presence.data).to be_nil stop_reactor end end end it 'enters if not already entered' do updated_callback_count = 0 channel_anonymous_client.attach do client_count.times do |client_id| presence_client_one.update_client("client:#{client_id}", data) do updated_callback_count += 1 end end presence_anonymous_client.subscribe(:enter) do |presence| expect(presence.data).to eql(data) clients << presence next unless clients.count == 5 wait_until(lambda { updated_callback_count == 5 }) do expect(clients.map(&:client_id).uniq.count).to eql(5) expect(updated_callback_count).to eql(5) stop_reactor end end end end end it_should_behave_like 'a public presence method', :update_client, nil, 'client_id' it_should_behave_like 'a presence on behalf of another client method', :update_client end context '#leave_client' do context 'leaves a channel' do context 'multiple times on the same channel with different client_ids' do it 'emits the :leave event for each client_id' do left_callback_count = 0 channel_anonymous_client.attach do client_count.times do |client_id| presence_client_one.enter_client("client:#{client_id}", random_str) do presence_client_one.leave_client("client:#{client_id}", data) do left_callback_count += 1 end end end presence_anonymous_client.subscribe(:leave) do |presence| expect(presence.data).to eql(data) clients << presence next unless clients.count == 5 wait_until(lambda { left_callback_count == 5 }) do expect(clients.map(&:client_id).uniq.count).to eql(5) expect(left_callback_count).to eql(5) stop_reactor end end end end it 'succeeds if that client_id has not previously entered the channel' do left_callback_count = 0 channel_anonymous_client.attach do client_count.times do |client_id| presence_client_one.leave_client("client:#{client_id}") do left_callback_count += 1 end end presence_anonymous_client.subscribe(:leave) do |presence| expect(presence.data).to be_nil clients << presence next unless clients.count == 5 wait_until(lambda { left_callback_count == 5 }) do expect(clients.map(&:client_id).uniq.count).to eql(5) expect(left_callback_count).to eql(5) stop_reactor end end end end end context 'with a new value in :data option' do it 'emits the leave event with the new data value' do channel_client_one.attach do presence_client_one.enter_client("client:unique", random_str) do presence_client_one.leave_client("client:unique", data) end presence_client_one.subscribe(:leave) do |presence_message| expect(presence_message.data).to eql(data) stop_reactor end end end end context 'with a nil value in :data option' do it 'emits the leave event with the previous value as a convenience' do channel_client_one.attach do presence_client_one.enter_client("client:unique", data) do presence_client_one.leave_client("client:unique", nil) end presence_client_one.subscribe(:leave) do |presence_message| expect(presence_message.data).to eql(data) stop_reactor end end end end context 'with no :data option' do it 'emits the leave event with the previous value as a convenience' do channel_client_one.attach do presence_client_one.enter_client("client:unique", data) do presence_client_one.leave_client("client:unique") end presence_client_one.subscribe(:leave) do |presence_message| expect(presence_message.data).to eql(data) stop_reactor end end end end end it_should_behave_like 'a public presence method', :leave_client, nil, 'client_id' it_should_behave_like 'a presence on behalf of another client method', :leave_client end end context '#get' do it 'returns a SafeDeferrable that catches exceptions in callbacks and logs them' do expect(presence_client_one.get).to be_a(Ably::Util::SafeDeferrable) stop_reactor end it 'calls the Deferrable callback on success' do presence_client_one.get.callback do |presence| expect(presence).to eq([]) stop_reactor end end it 'catches exceptions in the provided method block' do expect(presence_client_one.logger).to receive(:error) do |*args, &block| expect(args.concat([block ? block.call : nil]).join(',')).to match(/Intentional exception/) stop_reactor end presence_client_one.get { raise 'Intentional exception' } end it 'implicitly attaches the channel (#RTP11b)' do expect(channel_client_one).to be_initialized presence_client_one.get do |members| expect(channel_client_one).to be_attached stop_reactor end end context 'when the channel is SUSPENDED' do context 'with wait_for_sync: true' do it 'results in an error with @code@ @91005@ and a @message@ stating that the presence state is out of sync (#RTP11d)' do presence_client_one.enter do channel_client_one.transition_state_machine! :suspended presence_client_one.get(wait_for_sync: true).errback do |error| expect(error.code).to eql(91005) expect(error.message).to match(/presence state is out of sync/i) stop_reactor end end end end context 'with wait_for_sync: false' do it 'returns the current PresenceMap and does not wait for the channel to change to the ATTACHED state (#RTP11d)' do presence_client_one.enter do channel_client_one.transition_state_machine! :suspended presence_client_one.get(wait_for_sync: false) do |members| expect(channel_client_one).to be_suspended stop_reactor end end end end end it 'fails if the connection is DETACHED (#RTP11b)' do channel_client_one.attach do channel_client_one.detach do presence_client_one.get.tap do |deferrable| deferrable.callback { raise 'Get should not succeed' } deferrable.errback do |error| expect(error).to be_a(Ably::Exceptions::InvalidState) expect(error.message).to match(/Operation is not allowed when channel is in STATE.Detached/) stop_reactor end end end end end it 'fails if the connection is FAILED (#RTP11b)' do channel_client_one.attach do channel_client_one.transition_state_machine :failed expect(channel_client_one.state).to eq(:failed) presence_client_one.get.tap do |deferrable| deferrable.callback { raise 'Get should not succeed' } deferrable.errback do |error| expect(error).to be_a(Ably::Exceptions::InvalidState) expect(error.message).to match(/Operation is not allowed when channel is in STATE.Failed/) stop_reactor end end end end context 'during a sync', em_timeout: 30 do let(:pages) { 2 } let(:members_per_page) { 100 } let(:sync_pages_received) { [] } let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } let(:client_options) { default_options.merge(log_level: :none) } def connect_members_deferrables (members_per_page * pages + 1).times.map do |mem_index| # rate limit to 10 per second EventMachine::DefaultDeferrable.new.tap do |deferrable| EventMachine.add_timer(mem_index/10) do presence_client_one.enter_client("client:#{mem_index}").tap do |enter_deferrable| enter_deferrable.callback { |*args| deferrable.succeed *args } enter_deferrable.errback { |*args| deferrable.fail *args } end end end end end context 'when :wait_for_sync is true' do it 'fails if the connection becomes FAILED (#RTP11b)' do when_all(*connect_members_deferrables) do channel_client_two.attach do client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :sync sync_pages_received << protocol_message if sync_pages_received.count == 1 error_message = Ably::Models::ProtocolMessage.new(action: 9, error: { message: 'force failure' }) client_two.connection.__incoming_protocol_msgbus__.publish :protocol_message, error_message end end end end presence_client_two.get(wait_for_sync: true).tap do |deferrable| deferrable.callback { raise 'Get should not succeed' } deferrable.errback do |error| stop_reactor end end end end it 'fails if the channel becomes detached (#RTP11b)' do when_all(*connect_members_deferrables) do channel_client_two.attach do client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :sync # prevent any more SYNC messages coming through client_two.connection.transport.__incoming_protocol_msgbus__.unsubscribe channel_client_two.transition_state_machine :detaching channel_client_two.transition_state_machine :detached end end end presence_client_two.get(wait_for_sync: true).tap do |deferrable| deferrable.callback { raise 'Get should not succeed' } deferrable.errback do |error| stop_reactor end end end end end end it 'returns the current members on the channel (#RTP11a)' do presence_client_one.enter presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter presence_client_one.get do |members| expect(members.count).to eq(1) expect(client_one.client_id).to_not be_nil this_member = members.first expect(this_member.client_id).to eql(client_one.client_id) stop_reactor end end end it 'filters by connection_id option if provided (#RTP11c3)' do presence_client_one.enter do presence_client_two.enter end presence_client_one.subscribe(:enter) do |presence_message| # wait until the client_two enter event has been sent to client_one next unless presence_message.client_id == client_two.client_id presence_client_one.get(connection_id: client_one.connection.id) do |members| expect(members.count).to eq(1) expect(members.first.connection_id).to eql(client_one.connection.id) presence_client_one.get(connection_id: client_two.connection.id) do |members_two| expect(members_two.count).to eq(1) expect(members_two.first.connection_id).to eql(client_two.connection.id) stop_reactor end end end end it 'filters by client_id option if provided (#RTP11c2)' do presence_client_one.enter do presence_client_two.enter end presence_client_one.subscribe(:enter) do |presence_message| # wait until the client_two enter event has been sent to client_one next unless presence_message.client_id == client_two_id presence_client_one.get(client_id: client_one_id) do |members| expect(members.count).to eq(1) expect(members.first.client_id).to eql(client_one_id) expect(members.first.connection_id).to eql(client_one.connection.id) presence_client_one.get(client_id: client_two_id) do |members_two| expect(members_two.count).to eq(1) expect(members_two.first.client_id).to eql(client_two_id) expect(members_two.first.connection_id).to eql(client_two.connection.id) stop_reactor end end end end it 'does not wait for SYNC to complete if :wait_for_sync option is false (#RTP11c1)' do presence_client_one.enter presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter presence_client_two.get(wait_for_sync: false) do |members| expect(members.count).to eql(0) stop_reactor end end end it 'returns the list of members and waits for SYNC to complete by default (#RTP11a)' do presence_client_one.enter presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter presence_client_two.get do |members| expect(members.count).to eql(1) stop_reactor end end end context 'when a member enters and then leaves' do it 'has no members' do presence_client_one.enter do presence_client_one.leave end presence_client_one.subscribe(:leave) do presence_client_one.get do |members| expect(members.count).to eq(0) stop_reactor end end end end context 'when a member enters and the presence map is updated' do it 'adds the member as being :present (#RTP2d)' do presence_client_one.enter presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter presence_client_one.get do |members| expect(members.count).to eq(1) expect(members.first.action).to eq(:present) stop_reactor end end end end context 'with lots of members on different clients' do let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } let(:client_two) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } let(:members_per_client) { 10 } let(:clients_entered) { Hash.new { |hash, key| hash[key] = 0 } } let(:total_members) { members_per_client * 2 } it 'returns a complete list of members on all clients' do members_per_client.times do |indx| presence_client_one.enter_client("client_1:#{indx}") presence_client_two.enter_client("client_2:#{indx}") end presence_client_one.subscribe(:enter) do clients_entered[:client_one] += 1 end presence_client_two.subscribe(:enter) do clients_entered[:client_two] += 1 end wait_until(lambda { clients_entered[:client_one] + clients_entered[:client_two] == total_members * 2 }) do presence_anonymous_client.get(wait_for_sync: true) do |anonymous_members| expect(anonymous_members.count).to eq(total_members) expect(anonymous_members.map(&:client_id).uniq.count).to eq(total_members) presence_client_one.get(wait_for_sync: true) do |client_one_members| presence_client_two.get(wait_for_sync: true) do |client_two_members| expect(client_one_members.count).to eq(total_members) expect(client_one_members.count).to eq(client_two_members.count) stop_reactor end end end end end end end context '#subscribe' do let(:messages) { [] } context 'with no arguments' do it 'calls the callback for all presence events' do when_all(channel_client_one.attach, channel_client_two.attach) do presence_client_two.subscribe do |presence_message| messages << presence_message next unless messages.count == 3 expect(messages.map(&:action).map(&:to_sym)).to contain_exactly(:enter, :update, :leave) stop_reactor end presence_client_one.enter do presence_client_one.update do presence_client_one.leave end end end end end context 'with event name' do it 'calls the callback for specified presence event' do when_all(channel_client_one.attach, channel_client_two.attach) do presence_client_two.subscribe(:leave) do |presence_message| messages << presence_message next unless messages.count == 1 expect(messages.map(&:action).map(&:to_sym)).to contain_exactly(:leave) stop_reactor end presence_client_one.enter do presence_client_one.update do presence_client_one.leave end end end end end it 'implicitly attaches' do expect(client_one.connection).to be_initialized presence_client_one.subscribe { true } channel_client_one.on(:attached) do expect(client_one.connection).to be_connected expect(channel_client_one).to be_attached stop_reactor end end context 'with a callback that raises an exception' do let(:exception) { StandardError.new("Intentional error") } it 'logs the error and continues' do emitted_exception = false expect(client_one.logger).to receive(:error) do |*args, &block| expect(args.concat([block ? block.call : nil]).join(',')).to match(/#{exception.message}/) end presence_client_one.subscribe do |presence_message| emitted_exception = true raise exception end presence_client_one.enter do EventMachine.add_timer(1) do expect(emitted_exception).to eql(true) stop_reactor end end end end end context '#unsubscribe' do context 'with no arguments' do it 'removes the callback for all presence events' do when_all(channel_client_one.attach, channel_client_two.attach) do subscribe_callback = lambda { raise 'Should not be called' } presence_client_two.subscribe(&subscribe_callback) presence_client_two.unsubscribe(&subscribe_callback) presence_client_one.enter presence_client_one.update presence_client_one.leave do EventMachine.add_timer(1) do stop_reactor end end end end end context 'with event name' do it 'removes the callback for specified presence event' do when_all(channel_client_one.attach, channel_client_two.attach) do subscribe_callback = lambda { raise 'Should not be called' } presence_client_two.subscribe :leave, &subscribe_callback presence_client_two.unsubscribe :leave, &subscribe_callback presence_client_one.enter do presence_client_one.leave do EventMachine.add_timer(1) do stop_reactor end end end end end end end context 'REST #get' do it 'returns current members' do presence_client_one.enter data_payload presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter members_page = channel_rest_client_one.presence.get this_member = members_page.items.first expect(this_member).to be_a(Ably::Models::PresenceMessage) expect(this_member.client_id).to eql(client_one.client_id) expect(this_member.data).to eql(data_payload) stop_reactor end end it 'returns no members once left' do presence_client_one.enter(data_payload) do presence_client_one.leave presence_client_one.subscribe(:leave) do presence_client_one.unsubscribe :leave members_page = channel_rest_client_one.presence.get expect(members_page.items.count).to eql(0) stop_reactor end end end end context 'client_id with ASCII_8BIT' do let(:client_id) { random_str.encode(Encoding::ASCII_8BIT) } context 'in connection set up' do let(:client_one) { auto_close Ably::Realtime::Client.new(default_options.merge(client_id: client_id)) } it 'is converted into UTF_8' do presence_client_one.enter presence_client_one.on(:entered) do |presence| expect(presence.client_id.encoding).to eql(Encoding::UTF_8) expect(presence.client_id.encode(Encoding::ASCII_8BIT)).to eql(client_id) stop_reactor end end end context 'in channel options' do let(:client_one) { auto_close Ably::Realtime::Client.new(default_options) } it 'is converted into UTF_8' do channel_client_one.attach do presence_client_one.subscribe(:enter) do |presence| expect(presence.client_id.encoding).to eql(Encoding::UTF_8) expect(presence.client_id.encode(Encoding::ASCII_8BIT)).to eql(client_id) stop_reactor end presence_anonymous_client.enter_client(client_id) end end end end context 'encoding and decoding of presence message data' do let(:secret_key) { Ably::Util::Crypto.generate_random_key(256) } let(:cipher_options) { { key: secret_key, algorithm: 'aes', mode: 'cbc', key_length: 256 } } let(:channel_name) { random_str } let(:encrypted_channel) { client_one.channel(channel_name, cipher: cipher_options) } let(:channel_rest_client_one) { client_one.rest_client.channel(channel_name, cipher: cipher_options) } let(:crypto) { Ably::Util::Crypto.new(cipher_options) } let(:data) { { 'hash_id' => random_str } } let(:data_as_json) { data.to_json } let(:data_as_cipher) { crypto.encrypt(data.to_json) } it 'encrypts presence message data' do encrypted_channel.attach do encrypted_channel.presence.enter data end encrypted_channel.presence.__incoming_msgbus__.unsubscribe(:presence) # remove all subscribe callbacks that could decrypt the message encrypted_channel.presence.__incoming_msgbus__.subscribe(:presence) do |presence| if protocol == :json expect(presence['encoding']).to eql('json/utf-8/cipher+aes-256-cbc/base64') expect(crypto.decrypt(Base64.decode64(presence['data']))).to eql(data_as_json) else expect(presence['encoding']).to eql('json/utf-8/cipher+aes-256-cbc') expect(crypto.decrypt(presence['data'])).to eql(data_as_json) end stop_reactor end end context '#subscribe' do it 'emits decrypted enter events' do encrypted_channel.attach do encrypted_channel.presence.enter data end encrypted_channel.presence.subscribe(:enter) do |presence_message| expect(presence_message.encoding).to be_nil expect(presence_message.data).to eql(data) stop_reactor end end it 'emits decrypted update events' do encrypted_channel.attach do encrypted_channel.presence.enter('to be updated') do encrypted_channel.presence.update data end end encrypted_channel.presence.subscribe(:update) do |presence_message| expect(presence_message.encoding).to be_nil expect(presence_message.data).to eql(data) stop_reactor end end it 'emits previously set data for leave events' do encrypted_channel.attach do encrypted_channel.presence.enter(data) do encrypted_channel.presence.leave end end encrypted_channel.presence.subscribe(:leave) do |presence_message| expect(presence_message.encoding).to be_nil expect(presence_message.data).to eql(data) stop_reactor end end end context '#get' do it 'returns a list of members with decrypted data' do encrypted_channel.presence.enter(data) encrypted_channel.presence.subscribe(:enter) do encrypted_channel.presence.get do |members| member = members.first expect(member.encoding).to be_nil expect(member.data).to eql(data) stop_reactor end end end end context 'REST #get' do it 'returns a list of members with decrypted data' do encrypted_channel.presence.enter(data) encrypted_channel.presence.subscribe(:enter) do member = channel_rest_client_one.presence.get.items.first expect(member.encoding).to be_nil expect(member.data).to eql(data) stop_reactor end end end context 'when cipher settings do not match publisher' do let(:client_options) { default_options.merge(log_level: :fatal) } let(:incompatible_cipher_options) { { key: Ably::Util::Crypto.generate_random_key(128), algorithm: 'aes', mode: 'cbc', key_length: 128 } } let(:incompatible_encrypted_channel) { client_two.channel(channel_name, cipher: incompatible_cipher_options) } it 'delivers an unencoded presence message left with encoding value' do encrypted_channel.presence.enter data incompatible_encrypted_channel.presence.subscribe(:enter) do incompatible_encrypted_channel.presence.get do |members| member = members.first expect(member.encoding).to match(/cipher\+aes-256-cbc/) expect(member.data).to_not eql(data) stop_reactor end end end it 'emits an error when cipher does not match and presence data cannot be decoded' do incompatible_encrypted_channel.once(:attached) do expect(client_two.logger).to receive(:error) do |*args, &block| expect(args.concat([block ? block.call : nil]).join(',')).to match(/Cipher algorithm AES-128-CBC does not match/) stop_reactor end.at_least(:once) encrypted_channel.attach do encrypted_channel.presence.enter data end end incompatible_encrypted_channel.attach end end end context 'leaving' do specify 'expect :left event once underlying connection is closed' do presence_client_one.on(:left) do expect(presence_client_one.state).to eq(:left) stop_reactor end presence_client_one.enter do client_one.close end end specify 'expect :left event with client data from enter event' do presence_client_one.subscribe(:leave) do |message| presence_client_one.get(wait_for_sync: true) do |members| expect(members.count).to eq(0) expect(message.data).to eql(data_payload) stop_reactor end end presence_client_one.enter(data_payload) do presence_client_one.leave end end end context 'connection failure mid-way through a large member sync' do let(:members_count) { 201 } let(:sync_pages_received) { [] } let(:client_options) { default_options.merge(log_level: :fatal) } it 'resumes the SYNC operation (#RTP3)', em_timeout: 15 do when_all(*members_count.times.map do |indx| presence_anonymous_client.enter_client("client:#{indx}") end) do channel_client_two.attach do client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :sync sync_pages_received << protocol_message force_connection_failure client_two if sync_pages_received.count == 1 end end end presence_client_two.get(wait_for_sync: true) do |members| expect(members.count).to eql(members_count) expect(members.map(&:member_key).uniq.count).to eql(members_count) stop_reactor end end end end context 'server-initiated sync' do context 'with multiple SYNC pages' do let(:present_action) { 1 } let(:leave_action) { 3 } let(:presence_sync_1) do [ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: present_action }, { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: present_action } ] end let(:presence_sync_2) do [ { client_id: 'a', connection_id: 'one', id: 'one:1:0', action: leave_action } ] end it 'is initiated with a SYNC message and completed with a later SYNC message with no cursor value part of the channelSerial (#RTP18a, #RTP18b) ', em_timeout: 15 do presence_anonymous_client.get do |members| expect(members.length).to eql(0) expect(presence_anonymous_client).to be_sync_complete presence_anonymous_client.subscribe(:present) do expect(presence_anonymous_client).to_not be_sync_complete presence_anonymous_client.get do |members| expect(presence_anonymous_client).to be_sync_complete expect(members.length).to eql(1) expect(members.first.client_id).to eql('b') stop_reactor end end ## Fabricate server-initiated SYNC in two parts action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, connection_serial: 10, channel_serial: 'sequenceid:cursor', channel: channel_name, presence: presence_sync_1, timestamp: Time.now.to_i * 1000 ) anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message sync_message = Ably::Models::ProtocolMessage.new( action: action, connection_serial: 11, channel_serial: 'sequenceid:', # indicates SYNC is complete channel: channel_name, presence: presence_sync_2, timestamp: Time.now.to_i * 1000 ) anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message end end end context 'with a single SYNC page' do let(:present_action) { 1 } let(:leave_action) { 3 } let(:presence_sync) do [ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: present_action }, { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: present_action }, { client_id: 'a', connection_id: 'one', id: 'one:1:0', action: leave_action } ] end it 'is initiated and completed with a single SYNC message (and no channelSerial) (#RTP18a, #RTP18c) ', em_timeout: 15 do presence_anonymous_client.get do |members| expect(members.length).to eql(0) expect(presence_anonymous_client).to be_sync_complete presence_anonymous_client.subscribe(:present) do expect(presence_anonymous_client).to_not be_sync_complete presence_anonymous_client.get do |members| expect(presence_anonymous_client).to be_sync_complete expect(members.length).to eql(1) expect(members.first.client_id).to eql('b') stop_reactor end end ## Fabricate server-initiated SYNC in two parts action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, connection_serial: 10, channel: channel_name, presence: presence_sync, timestamp: Time.now.to_i * 1000 ) anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message end end end context 'when members exist in the PresenceMap before a SYNC completes' do let(:enter_action) { Ably::Models::PresenceMessage::ACTION.Enter.to_i } let(:present_action) { Ably::Models::PresenceMessage::ACTION.Present.to_i } let(:presence_sync_protocol_message) do [ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: present_action }, { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: present_action } ] end let(:presence_enter_message) do Ably::Models::PresenceMessage.new( 'id' => "#{random_str}:#{random_str}:0", 'clientId' => random_str, 'connectionId' => random_str, 'timestamp' => as_since_epoch(Time.now), 'action' => enter_action ) end it 'removes the members that are no longer present (#RTP19)', em_timeout: 15 do presence_anonymous_client.get do |members| expect(members.length).to eql(0) # Now inject a fake member into the PresenceMap by faking the receive of a Presence message from Ably into the Presence object presence_anonymous_client.__incoming_msgbus__.publish :presence, presence_enter_message EventMachine.next_tick do presence_anonymous_client.get do |members| expect(members.length).to eql(1) expect(members.first.client_id).to eql(presence_enter_message.client_id) presence_events = [] presence_anonymous_client.subscribe do |presence_message| presence_events << [presence_message.client_id, presence_message.action.to_sym] if presence_message.action == :leave expect(presence_message.id).to be_nil expect(presence_message.timestamp.to_f * 1000).to be_within(200).of(Time.now.to_f * 1000) end end ## Fabricate server-initiated SYNC in two parts action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, connection_serial: 10, channel: channel_name, presence: presence_sync_protocol_message, timestamp: Time.now.to_i * 1000 ) anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message EventMachine.next_tick do presence_anonymous_client.get do |members| expect(members.length).to eql(2) expect(members.find { |member| member.client_id == presence_enter_message.client_id}).to be_nil expect(presence_events).to contain_exactly( ['a', :present], ['b', :present], [presence_enter_message.client_id, :leave] ) stop_reactor end end end end end end end end context 'when the client does not have presence subscribe privileges but is present on the channel' do let(:present_only_capability) do { channel_name => ["presence"] } end let(:present_only_callback) { lambda { |token_params| Ably::Rest::Client.new(client_options).auth.request_token(client_id: '*', capability: present_only_capability) } } let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: present_only_callback)) } it 'receives presence updates for all presence events generated by the current connection and the presence map is kept up to date (#RTP17a)' do enter_client_ids = [] presence_client_one.subscribe(:enter) do |presence_message| enter_client_ids << presence_message.client_id end leave_client_ids = [] presence_client_one.subscribe(:leave) do |presence_message| leave_client_ids << presence_message.client_id end presence_client_one.enter_client 'bob' do presence_client_one.enter_client 'sarah' end entered_count = 0 presence_client_one.subscribe(:enter) do entered_count += 1 next unless entered_count == 2 presence_client_one.unsubscribe :enter presence_client_one.get do |members| EventMachine.add_timer(1) do expect(members.map(&:client_id)).to contain_exactly('bob', 'sarah') expect(enter_client_ids).to contain_exactly('bob', 'sarah') presence_client_one.leave_client 'bob' do presence_client_one.leave_client 'sarah' end leave_count = 0 presence_client_one.subscribe(:leave) do leave_count += 1 next unless leave_count == 2 presence_client_one.get do |members| expect(members.length).to eql(0) expect(leave_client_ids).to contain_exactly('bob', 'sarah') stop_reactor end end end end end end end context "local PresenceMap for presence members entered by this client" do it "maintains a copy of the member map for any member that shares this connection's connection ID (#RTP17)" do presence_client_one.enter do presence_client_two.enter end entered_count = 0 presence_client_one.subscribe(:enter) do entered_count += 1 next unless entered_count == 2 channel_anonymous_client.attach do channel_anonymous_client.presence.get do |members| expect(channel_anonymous_client.presence.members.local_members).to be_empty expect(presence_client_one.members.local_members.length).to eql(1) expect(presence_client_one.members.local_members.values.first.connection_id).to eql(client_one.connection.id) expect(presence_client_two.members.local_members.values.first.connection_id).to eql(client_two.connection.id) presence_client_two.leave presence_client_two.subscribe(:leave) do expect(presence_client_two.members.local_members).to be_empty stop_reactor end end end end end describe '#RTP17b' do let(:leave_action) { Ably::Models::PresenceMessage::ACTION.Leave } it 'updates presence members on leave' do presence_client_two.subscribe(:enter) do channel_anonymous_client.attach do channel_anonymous_client.presence.get do |members| presence_client_two.subscribe(:leave) do expect(presence_client_two.members.local_members).to be_empty stop_reactor end leave_message = Ably::Models::PresenceMessage.new( 'id' => "#{client_two.connection.id}:#{presence_client_two.client_id}:1", 'clientId' => presence_client_two.client_id, 'connectionId' => client_two.connection.id, 'timestamp' => as_since_epoch(Time.now), 'action' => leave_action ) presence_client_two.__incoming_msgbus__.publish :presence, leave_message end end end presence_client_two.enter end it 'does no update presence members on fabricated leave' do presence_client_two.subscribe(:enter) do channel_anonymous_client.attach do channel_anonymous_client.presence.get do |members| presence_client_two.subscribe(:leave) do expect(presence_client_two.members.local_members).to_not be_empty stop_reactor end fabricated_leave_message = Ably::Models::PresenceMessage.new( 'id' => "#{client_two.connection.id}:#{presence_client_two.client_id}:1", 'clientId' => presence_client_two.client_id, 'connectionId' => "fabricated:#{presence_client_two.client_id}:0", 'timestamp' => as_since_epoch(Time.now), 'action' => leave_action ) presence_client_two.__incoming_msgbus__.publish :presence, fabricated_leave_message end end end presence_client_two.enter end end context 'when a channel becomes attached again' do let(:attached_action) { Ably::Models::ProtocolMessage::ACTION.Attached.to_i } let(:sync_action) { Ably::Models::ProtocolMessage::ACTION.Sync.to_i } let(:presence_action) { Ably::Models::ProtocolMessage::ACTION.Presence.to_i } let(:present_action) { Ably::Models::PresenceMessage::ACTION.Present.to_i } let(:resume_flag) { 4 } let(:presence_flag) { 1 } def fabricate_incoming_protocol_message(protocol_message) client_one.connection.__incoming_protocol_msgbus__.publish :protocol_message, protocol_message end # Prevents any messages from the WebSocket transport being sent / received # Connection protocol message subscriptions are still active, but nothing reaches or comes from the WebSocket transport def cripple_websocket_transport client_one.connection.transport.__incoming_protocol_msgbus__.unsubscribe client_one.connection.transport.__outgoing_protocol_msgbus__.unsubscribe end context 'and the resume flag is false' do context 'and the presence flag is false' do let(:member_data) { random_str } it 'immediately resends all local presence members (#RTP5c2, #RTP19a)' do in_sync_confirmed_no_local_members = false local_member_leave_event_fired = false presence_client_one.enter(member_data) presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter presence_client_one.subscribe(:leave) do |message| # The local member will leave the PresenceMap due to the ATTACHED without Presence local_member_leave_event_fired = true end # Local members re-entered automatically appear as updates due to the # fabricated ATTACHED message sent and the members already being present presence_client_one.subscribe(:update) do |message| expect(local_member_leave_event_fired).to be_truthy expect(message.data).to eq(member_data) expect(message.client_id).to eq(client_one.auth.client_id) EventMachine.next_tick do expect(presence_client_one.members.length).to eql(1) expect(presence_client_one.members.local_members.length).to eql(1) expect(in_sync_confirmed_no_local_members).to be_truthy stop_reactor end end presence_client_one.members.once(:in_sync) do # Immediately after SYNC (no sync actually occurred, but this event fires immediately after a channel SYNCs or is not expecting to SYNC) expect(presence_client_one.members.length).to eql(0) expect(presence_client_one.members.local_members.length).to eql(0) in_sync_confirmed_no_local_members = true end # ATTACHED ProtocolMessage with no presence flag will clear the presence set immediately, #RTP19a fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( action: attached_action, channel: channel_name, flags: 0 # no resume or presence flag ) end end end end context 'when re-entering a client automatically, if the re-enter fails for any reason' do let(:client_one_options) do client_options.merge(client_id: client_one_id, log_level: :error) end let(:client_one) { auto_close Ably::Realtime::Client.new(client_one_options) } it 'should emit an ErrorInfo with error code 91004 (#RTP5c3)' do presence_client_one.enter # Wait for client to be entered presence_client_one.subscribe(:enter) do # Local member should not be re-entered as the request to re-enter will timeout presence_client_one.subscribe(:update) do |message| raise "Unexpected update, this should not happen as the re-enter fails" end channel_client_one.on(:update) do |channel_state_change| next if channel_state_change.reason.nil? # first update is generated by the fabricated ATTACHED expect(channel_state_change.reason.code).to eql(91004) expect(channel_state_change.reason.message).to match(/#{client_one_id}/) expect(channel_state_change.reason.message).to match(/Fabricated/) # fabricated message expect(channel_state_change.reason.message).to match(/2345/) # fabricated error code stop_reactor end cripple_websocket_transport client_one.connection.__outgoing_protocol_msgbus__.subscribe do |protocol_message| if protocol_message.action == :presence # Fabricate a NACK for the re-enter message EventMachine.add_timer(0.1) do fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( action: Ably::Models::ProtocolMessage::ACTION.Nack.to_i , channel: channel_name, count: 1, msg_serial: protocol_message.message_serial, error: { message: 'Fabricated', code: 2345 } ) end end end # ATTACHED ProtocolMessage with no presence flag will clear the presence set immediately, #RTP19a fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( action: attached_action, channel: channel_name, flags: 0 # no resume or presence flag ) end end end end end context 'channel state side effects (RTP5)' do context 'channel transitions to the ATTACHED state (RTP5b)' do it 'all queued presence messages are sent' do channel_client_one.on(:attached) do client_one.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :presence expect(protocol_message.action).to eq(:presence) stop_reactor end end end presence_client_one.enter do channel_client_one.attach end end end context 'channel transitions to the FAILED state' do let(:anonymous_client) { auto_close Ably::Realtime::Client.new(client_options.merge(log_level: :fatal)) } let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(client_id: client_one_id, log_level: :fatal)) } it 'clears the PresenceMap and local member map copy and does not emit any presence events (#RTP5a)' do presence_client_one.enter presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter channel_anonymous_client.attach do presence_anonymous_client.get do |members| expect(members.count).to eq(1) presence_anonymous_client.subscribe { raise 'No presence events should be emitted' } channel_anonymous_client.transition_state_machine! :failed, reason: RuntimeError.new expect(presence_anonymous_client.members.length).to eq(0) expect(channel_anonymous_client).to be_failed presence_anonymous_client.unsubscribe # prevent events being sent to the channel from Ably as it is unaware it's FAILED expect(presence_client_one.members.local_members.count).to eq(1) channel_client_one.transition_state_machine! :failed expect(channel_client_one).to be_failed expect(presence_client_one.members.local_members.count).to eq(0) stop_reactor end end end end end context 'channel transitions to the DETACHED state' do it 'clears the PresenceMap and local member map copy and does not emit any presence events (#RTP5a)' do presence_client_one.enter presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter channel_anonymous_client.attach do presence_anonymous_client.get do |members| expect(members.count).to eq(1) presence_anonymous_client.subscribe { raise 'No presence events should be emitted' } channel_anonymous_client.detach do expect(presence_anonymous_client.members.length).to eq(0) expect(channel_anonymous_client).to be_detached expect(presence_client_one.members.local_members.count).to eq(1) channel_client_one.detach do expect(presence_client_one.members.local_members.count).to eq(0) stop_reactor end end end end end end end context 'channel transitions to the SUSPENDED state' do let(:auth_callback) do lambda do |token_params| # Pause to allow presence updates to occur whilst disconnected sleep 1 Ably::Rest::Client.new(client_options).auth.request_token end end let(:anonymous_client) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: auth_callback)) } it 'maintains the PresenceMap and only publishes presence event changes since the last attached state (#RTP5f)' do presence_client_one.enter do presence_client_two.enter end entered_count = 0 presence_client_one.subscribe(:enter) do entered_count += 1 next unless entered_count == 2 presence_client_one.unsubscribe :enter channel_anonymous_client.attach do presence_anonymous_client.get do |members| expect(members.count).to eq(2) received_events = [] presence_anonymous_client.subscribe do |presence_message| expect(presence_message.action).to eq(:leave) expect(presence_message.client_id).to eql(client_one_id) received_events << [:leave, presence_message.client_id] end # Kill the connection triggering an automatic reconnect and reattach of the channel that is about to put into the suspended state anonymous_client.connection.transport.close_connection_after_writing # Prevent the same connection resuming, we want a new connection and the channel SYNC to be sent anonymous_client.connection.reset_resume_info anonymous_client.connection.once(:disconnected) do # Move to the SUSPENDED state and check presence map intact channel_anonymous_client.transition_state_machine! :suspended # Change the presence map state on that channel by getting one member to leave whilst the connection for anonymous client is diconnected presence_client_one.leave # Whilst SUSPENDED and DISCONNECTED, a get of the PresenceMap should still reveal two members presence_anonymous_client.get(wait_for_sync: false) do |members| expect(members.count).to eq(2) channel_anonymous_client.once(:attached) do presence_anonymous_client.get do |members| expect(members.count).to eq(1) EventMachine.add_timer(0.5) do expect(received_events).to contain_exactly([:leave, client_one_id]) presence_anonymous_client.unsubscribe stop_reactor end end end end end end end end end end end end end