lib/submodules/ably-ruby/spec/acceptance/realtime/presence_spec.rb in ably-rest-0.9.3 vs lib/submodules/ably-ruby/spec/acceptance/realtime/presence_spec.rb in ably-rest-1.0.0

- old
+ new

@@ -40,13 +40,22 @@ end end def setup_test(method_name, args, options) if options[:enter_first] + acked = false + received = false presence_client_one.public_send(method_name.to_s.gsub(/leave|update/, 'enter'), args) do - yield + acked = true + yield if acked & received end + presence_client_one.subscribe do |presence_message| + expect(presence_message.action).to eq(:enter) + presence_client_one.unsubscribe + received = true + yield if acked & received + end else yield end end @@ -54,28 +63,72 @@ it 'raise an exception if the channel is detached' do setup_test(method_name, args, options) do channel_client_one.attach do channel_client_one.transition_state_machine :detaching channel_client_one.once(:detached) do - expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::InvalidStateChange, /Operation is not allowed when channel is in STATE.detached/i - stop_reactor + presence_client_one.public_send(method_name, args).tap do |deferrable| + deferrable.callback { raise 'Get should not succeed' } + deferrable.errback do |error| + expect(error).to be_a(Ably::Exceptions::InvalidState) + expect(error.message).to match(/Operation is not allowed when channel is in STATE.Detached/) + stop_reactor + end + end end end end end + it 'raise an exception if the channel becomes detached' do + setup_test(method_name, args, options) do + channel_client_one.attach do + channel_client_one.transition_state_machine :detaching + presence_client_one.public_send(method_name, args).tap do |deferrable| + deferrable.callback { raise 'Get should not succeed' } + deferrable.errback do |error| + expect(error).to be_a(Ably::Exceptions::InvalidState) + expect(error.message).to match(/Operation failed as channel transitioned to STATE.Detached/) + stop_reactor + end + end + end + end + end + it 'raise an exception if the channel is failed' do setup_test(method_name, args, options) do channel_client_one.attach do channel_client_one.transition_state_machine :failed expect(channel_client_one.state).to eq(:failed) - expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::InvalidStateChange, /Operation is not allowed when channel is in STATE.failed/i - stop_reactor + presence_client_one.public_send(method_name, args).tap do |deferrable| + deferrable.callback { raise 'Get should not succeed' } + deferrable.errback do |error| + expect(error).to be_a(Ably::Exceptions::InvalidState) + expect(error.message).to match(/Operation is not allowed when channel is in STATE.Failed/) + stop_reactor + end + end end end end + it 'raise an exception if the channel becomes failed' do + setup_test(method_name, args, options) do + channel_client_one.attach do + presence_client_one.public_send(method_name, args).tap do |deferrable| + deferrable.callback { raise 'Get should not succeed' } + deferrable.errback do |error| + expect(error).to be_a(Ably::Exceptions::MessageDeliveryFailed) + stop_reactor + end + end + channel_client_one.transition_state_machine :failed + expect(channel_client_one.state).to eq(:failed) + end + end + end + it 'implicitly attaches the channel' do expect(channel_client_one).to_not be_attached presence_client_one.public_send(method_name, args) do expect(channel_client_one).to be_attached stop_reactor @@ -84,37 +137,43 @@ context 'when :queue_messages client option is false' do let(:client_one) { auto_close Ably::Realtime::Client.new(default_options.merge(queue_messages: false, client_id: client_id)) } context 'and connection state initialized' do - it 'raises an exception' do - expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::MessageQueueingDisabled + it 'fails the deferrable' do + presence_client_one.public_send(method_name, args).errback do |error| + expect(error).to be_a(Ably::Exceptions::MessageQueueingDisabled) + stop_reactor + end expect(client_one.connection).to be_initialized - stop_reactor end end context 'and connection state connecting' do - it 'raises an exception' do + it 'fails the deferrable' do client_one.connect EventMachine.next_tick do - expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::MessageQueueingDisabled + presence_client_one.public_send(method_name, args).errback do |error| + expect(error).to be_a(Ably::Exceptions::MessageQueueingDisabled) + stop_reactor + end expect(client_one.connection).to be_connecting - stop_reactor end end end context 'and connection state disconnected' do let(:client_one) { auto_close Ably::Realtime::Client.new(default_options.merge(queue_messages: false, client_id: client_id, :log_level => :error)) } - it 'raises an exception' do + it 'fails the deferrable' do client_one.connection.once(:connected) do client_one.connection.once(:disconnected) do - expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::MessageQueueingDisabled + presence_client_one.public_send(method_name, args).errback do |error| + expect(error).to be_a(Ably::Exceptions::MessageQueueingDisabled) + stop_reactor + end expect(client_one.connection).to be_disconnected - stop_reactor end client_one.connection.transition_state_machine :disconnected end end end @@ -256,11 +315,12 @@ end end it 'catches exceptions in the provided method block and logs them to the logger' do setup_test(method_name, args, options) do - expect(presence_client_one.logger).to receive(:error).with(/Intentional exception/) do + expect(presence_client_one.logger).to receive(:error) do |*args, &block| + expect(args.concat([block ? block.call : nil]).join(',')).to match(/Intentional exception/) stop_reactor end presence_client_one.public_send(method_name, args) { raise 'Intentional exception' } end end @@ -319,10 +379,17 @@ it 'throws an exception' do expect { presence_channel.public_send(method_name, nil) }.to raise_error Ably::Exceptions::IncompatibleClientId stop_reactor end end + + context 'and a client_id that is not a string type' do + it 'throws an exception' do + expect { presence_channel.public_send(method_name, 1) }.to raise_error Ably::Exceptions::IncompatibleClientId + stop_reactor + end + end end context ":#{method_name} when authenticated with a valid client_id" do let(:token) { Ably::Rest::Client.new(default_options).auth.request_token(client_id: 'valid').token } let(:client_options) { default_options.merge(key: nil, token: token) } @@ -407,18 +474,18 @@ end end end context 'when attached (but not present) on a presence channel with an anonymous client (no client ID)' do - it 'maintains state as other clients enter and leave the channel' do + it 'maintains state as other clients enter and leave the channel (#RTP2e)' do channel_anonymous_client.attach do presence_anonymous_client.subscribe(:enter) do |presence_message| expect(presence_message.client_id).to eql(client_one.client_id) presence_anonymous_client.get do |members| expect(members.first.client_id).to eql(client_one.client_id) - expect(members.first.action).to eq(:enter) + expect(members.first.action).to eq(:present) presence_anonymous_client.subscribe(:leave) do |leave_presence_message| expect(leave_presence_message.client_id).to eql(client_one.client_id) presence_anonymous_client.get do |members_once_left| @@ -434,11 +501,11 @@ presence_client_one.leave end end end - context '#members map', api_private: true do + context '#members map / PresenceMap (#RTP2)', api_private: true do it 'is available once the channel is created' do expect(presence_anonymous_client.members).to_not be_nil stop_reactor end @@ -466,11 +533,18 @@ end end context 'once server sync is complete' do it 'behaves like an Enumerable allowing direct access to current members' do - when_all(presence_client_one.enter, presence_client_two.enter) do + presence_client_one.enter + presence_client_two.enter + + entered = 0 + presence_client_one.subscribe(:enter) do + entered += 1 + next unless entered == 2 + presence_anonymous_client.members.once(:in_sync) do expect(presence_anonymous_client.members.count).to eql(2) member_ids = presence_anonymous_client.members.map(&:member_key) expect(member_ids.count).to eql(2) expect(member_ids.uniq.count).to eql(2) @@ -479,50 +553,224 @@ channel_anonymous_client.attach end end end + + context 'the map is based on the member_key (connection_id & client_id)' do + # 2 unqiue client_id with client_id "b" being on two connections + let(:enter_action) { 2 } + let(:presence_data) do + [ + { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action }, + { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: enter_action }, + { client_id: 'a', connection_id: 'one', id: 'one:0:2', action: enter_action }, + { client_id: 'b', connection_id: 'one', id: 'one:0:3', action: enter_action }, + { client_id: 'b', connection_id: 'two', id: 'two:0:4', action: enter_action } + ] + end + + it 'ensures uniqueness from this member_key (#RTP2a)' do + channel_anonymous_client.attach do + presence_anonymous_client.get do |members| + expect(members.length).to eql(0) + + ## Fabricate members + action = Ably::Models::ProtocolMessage::ACTION.Presence + presence_msg = Ably::Models::ProtocolMessage.new( + action: action, + connection_serial: 20, + channel: channel_name, + presence: presence_data, + timestamp: Time.now.to_i * 1000 + ) + anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, presence_msg + + EventMachine.add_timer(0.5) do + presence_anonymous_client.get do |members| + expect(members.length).to eql(3) + expect(members.map { |member| member.client_id }.uniq).to contain_exactly('a', 'b') + stop_reactor + end + end + end + end + end + end + + context 'newness is compared based on PresenceMessage#id unless synthesized' do + let(:page_size) { 100 } + let(:enter_expected_count) { page_size + 1 } # 100 per page, this ensures we have more than one page so that we can test newness during sync + let(:enter_action) { 2 } + let(:leave_action) { 3 } + let(:now) { Time.now.to_i * 1000 } + let(:entered) { [] } + let(:client_one) { auto_close Ably::Realtime::Client.new(default_options.merge(auth_callback: wildcard_token)) } + + def setup_members_on(presence) + enter_expected_count.times do |indx| + # 10 messages per second max rate on simulation accounts + rate = indx.to_f / 10 + EventMachine.add_timer(rate) do + presence.enter_client("client:#{indx}") do |message| + entered << message + next unless entered.count == enter_expected_count + yield + end + end + end + end + + def allow_sync_fabricate_data_final_sync_and_assert_members + setup_members_on(presence_client_one) do + sync_pages_received = [] + anonymous_client.connection.once(:connected) do + anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :sync + sync_pages_received << protocol_message + if sync_pages_received.count == 1 + action = Ably::Models::ProtocolMessage::ACTION.Presence + presence_msg = Ably::Models::ProtocolMessage.new( + action: action, + connection_serial: anonymous_client.connection.serial + 1, + channel: channel_name, + presence: presence_data, + timestamp: Time.now.to_i * 1000 + ) + anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, presence_msg + + # Now simulate an end to the sync + action = Ably::Models::ProtocolMessage::ACTION.Sync + sync_msg = Ably::Models::ProtocolMessage.new( + action: action, + connection_serial: anonymous_client.connection.serial + 2, + channel: channel_name, + channel_serial: 'validserialprefix:', # with no part after the `:` this indicates the end to the SYNC + presence: [], + timestamp: Time.now.to_i * 1000 + ) + anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_msg + + # Stop the next SYNC arriving + anonymous_client.connection.__incoming_protocol_msgbus__.unsubscribe + end + end + end + + presence_anonymous_client.get do |members| + expect(members.length).to eql(page_size + 2) + expect(members.find { |member| member.client_id == 'a' }).to be_nil + expect(members.find { |member| member.client_id == 'b' }.timestamp.to_i).to eql(now / 1000) + expect(members.find { |member| member.client_id == 'c' }.timestamp.to_i).to eql(now / 1000) + stop_reactor + end + end + end + end + + context 'when presence messages are synthesized' do + let(:presence_data) do + [ + { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action, timestamp: now }, # first messages from client, second fabricated + { client_id: 'a', connection_id: 'one', id: 'fabricated:0:1', action: leave_action, timestamp: now + 1 }, # leave after enter based on timestamp + { client_id: 'b', connection_id: 'one', id: 'one:0:2', action: enter_action, timestamp: now }, # first messages from client, second fabricated + { client_id: 'b', connection_id: 'one', id: 'fabricated:0:3', action: leave_action, timestamp: now - 1 }, # leave before enter based on timestamp + { client_id: 'c', connection_id: 'one', id: 'fabricated:0:4', action: enter_action, timestamp: now }, # both messages fabricated + { client_id: 'c', connection_id: 'one', id: 'fabricated:0:5', action: leave_action, timestamp: now - 1 } # leave before enter based on timestamp + ] + end + + it 'compares based on timestamp if either has a connectionId not part of the presence message id (#RTP2b1)' do + allow_sync_fabricate_data_final_sync_and_assert_members + end + end + + context 'when presence messages are not synthesized (events sent from clients)' do + let(:presence_data) do + [ + { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action, timestamp: now }, # first messages from client + { client_id: 'a', connection_id: 'one', id: 'one:1:0', action: leave_action, timestamp: now - 1 }, # leave after enter based on msgSerial part of ID + { client_id: 'b', connection_id: 'one', id: 'one:2:2', action: enter_action, timestamp: now }, # first messages from client + { client_id: 'b', connection_id: 'one', id: 'one:2:1', action: leave_action, timestamp: now + 1 }, # leave before enter based on index part of ID + { client_id: 'c', connection_id: 'one', id: 'one:4:4', action: enter_action, timestamp: now }, # first messages from client + { client_id: 'c', connection_id: 'one', id: 'one:3:5', action: leave_action, timestamp: now + 1 } # leave before enter based on msgSerial part of ID + ] + end + + it 'compares based on timestamp if either has a connectionId not part of the presence message id (#RTP2b2)' do + allow_sync_fabricate_data_final_sync_and_assert_members + end + end + end end - context '#sync_complete?' do + context '#sync_complete? and SYNC flags (#RTP1)' do context 'when attaching to a channel without any members present' do - it 'is true and the presence channel is considered synced immediately' do + it 'sync_complete? is true, there is no presence flag, and the presence channel is considered synced immediately (#RTP1)' do + flag_checked = false + + anonymous_client.connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :attached + flag_checked = true + expect(protocol_message.has_presence_flag?).to eql(false) + end + end + channel_anonymous_client.attach do expect(channel_anonymous_client.presence).to be_sync_complete - stop_reactor + EventMachine.next_tick do + expect(flag_checked).to eql(true) + stop_reactor + end end end end context 'when attaching to a channel with members present' do - it 'is false and the presence channel will subsequently be synced' do - presence_client_one.enter do + it 'sync_complete? is false, there is a presence flag, and the presence channel is subsequently synced (#RTP1)' do + flag_checked = false + + anonymous_client.connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :attached + flag_checked = true + expect(protocol_message.has_presence_flag?).to eql(true) + end + end + + presence_client_one.enter + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + channel_anonymous_client.attach do expect(channel_anonymous_client.presence).to_not be_sync_complete - channel_anonymous_client.presence.get(wait_for_sync: true) do + channel_anonymous_client.presence.get do expect(channel_anonymous_client.presence).to be_sync_complete - stop_reactor + EventMachine.next_tick do + expect(flag_checked).to eql(true) + stop_reactor + end end end end end end end - context '250 existing (present) members on a channel (3 SYNC pages)' do - context 'requires at least 3 SYNC ProtocolMessages', em_timeout: 30 do - let(:enter_expected_count) { 250 } + context '101 existing (present) members on a channel (2 SYNC pages)' do + context 'requiring at least 2 SYNC ProtocolMessages', em_timeout: 40 do + let(:enter_expected_count) { 101 } let(:present) { [] } let(:entered) { [] } let(:sync_pages_received) { [] } let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } def setup_members_on(presence) - enter_expected_count.times do |index| + enter_expected_count.times do |indx| # 10 messages per second max rate on simulation accounts - EventMachine.add_timer(index / 10) do - presence.enter_client("client:#{index}") do |message| + rate = indx.to_f / 10 + EventMachine.add_timer(rate) do + presence.enter_client("client:#{indx}") do |message| entered << message next unless entered.count == enter_expected_count yield end end @@ -541,12 +789,51 @@ stop_reactor end end end + context 'and a member enters before the SYNC operation is complete' do + let(:enter_client_id) { random_str } + + it 'emits a :enter immediately and the member is :present once the sync is complete (#RTP2g)' do + setup_members_on(presence_client_one) do + member_entered = false + + anonymous_client.connect do + presence_anonymous_client.subscribe(:enter) do |member| + expect(member.client_id).to eql(enter_client_id) + member_entered = true + end + + presence_anonymous_client.get do |members| + expect(members.find { |member| member.client_id == enter_client_id }.action).to eq(:present) + stop_reactor + end + + anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :sync + sync_pages_received << protocol_message + if sync_pages_received.count == 1 + enter_action = Ably::Models::PresenceMessage::ACTION.Enter + enter_member = Ably::Models::PresenceMessage.new( + 'id' => "#{client_one.connection.id}:#{random_str}:0", + 'clientId' => enter_client_id, + 'connectionId' => client_one.connection.id, + 'timestamp' => as_since_epoch(Time.now), + 'action' => enter_action + ) + presence_anonymous_client.__incoming_msgbus__.publish :presence, enter_member + end + end + end + end + end + end + end + context 'and a member leaves before the SYNC operation is complete' do - it 'emits :leave immediately as the member leaves' do + it 'emits :leave immediately as the member leaves and cleans up the ABSENT member after (#RTP2f, #RTP2g)' do all_client_ids = enter_expected_count.times.map { |id| "client:#{id}" } setup_members_on(presence_client_one) do leave_member = nil @@ -556,11 +843,18 @@ end presence_anonymous_client.subscribe(:leave) do |leave_message| expect(leave_message.client_id).to eql(leave_member.client_id) expect(present.count).to be < enter_expected_count - EventMachine.add_timer(1) do + + # Hacky accessing a private method, but absent members are intentionally not exposed to any public APIs + expect(presence_anonymous_client.members.send(:absent_members).length).to eql(1) + + presence_anonymous_client.members.once(:in_sync) do + # Check that members count is exact indicating the members with LEAVE action after sync are removed + expect(presence_anonymous_client).to be_sync_complete + expect(presence_anonymous_client.members.length).to eql(enter_expected_count - 1) presence_anonymous_client.unsubscribe stop_reactor end end @@ -569,11 +863,11 @@ if protocol_message.action == :sync sync_pages_received << protocol_message if sync_pages_received.count == 1 leave_action = Ably::Models::PresenceMessage::ACTION.Leave leave_member = Ably::Models::PresenceMessage.new( - 'id' => "#{client_one.connection.id}-#{all_client_ids.first}:0", + 'id' => "#{client_one.connection.id}:#{all_client_ids.first}:0", 'clientId' => all_client_ids.first, 'connectionId' => client_one.connection.id, 'timestamp' => as_since_epoch(Time.now), 'action' => leave_action ) @@ -583,53 +877,56 @@ end end end end - it 'ignores presence events with timestamps prior to the current :present event in the MembersMap' do + it 'ignores presence events with timestamps / identifiers prior to the current :present event in the MembersMap (#RTP2c)' do started_at = Time.now setup_members_on(presence_client_one) do leave_member = nil presence_anonymous_client.subscribe(:present) do |present_message| present << present_message - leave_member = present_message unless leave_member if present.count == enter_expected_count presence_anonymous_client.get do |members| - expect(members.find { |member| member.client_id == leave_member.client_id}.action).to eq(:present) + member = members.find { |member| member.client_id == leave_member.client_id} + expect(member).to_not be_nil + expect(member.action).to eq(:present) EventMachine.add_timer(1) do presence_anonymous_client.unsubscribe stop_reactor end end end end presence_anonymous_client.subscribe(:leave) do |leave_message| - raise 'Leave event should not have been fired because it is out of date' + raise "Leave event for #{leave_message} should not have been fired because it is out of date" end anonymous_client.connect do anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :sync sync_pages_received << protocol_message if sync_pages_received.count == 1 + first_member = protocol_message.presence[0] # get the first member in the SYNC set leave_action = Ably::Models::PresenceMessage::ACTION.Leave leave_member = Ably::Models::PresenceMessage.new( - leave_member.as_json.merge('action' => leave_action, 'timestamp' => as_since_epoch(started_at)) + first_member.as_json.merge('action' => leave_action, 'timestamp' => as_since_epoch(started_at)) ) + # After the SYNC has started, no inject that member has having left with a timestamp before the sync presence_anonymous_client.__incoming_msgbus__.publish :presence, leave_member end end end end end end - it 'does not emit :present after the :leave event has been emitted, and that member is not included in the list of members via #get with :wait_for_sync' do + it 'does not emit :present after the :leave event has been emitted, and that member is not included in the list of members via #get (#RTP2f)' do left_client = 10 left_client_id = "client:#{left_client}" setup_members_on(presence_client_one) do member_left_emitted = false @@ -647,11 +944,11 @@ end expect(leave_message.client_id).to eql(left_client_id) member_left_emitted = true end - presence_anonymous_client.get(wait_for_sync: true) do |members| + presence_anonymous_client.get do |members| expect(members.count).to eql(enter_expected_count - 1) expect(member_left_emitted).to eql(true) expect(members.map(&:client_id)).to_not include(left_client_id) EventMachine.add_timer(1) do presence_anonymous_client.unsubscribe @@ -660,11 +957,11 @@ end channel_anonymous_client.attach do leave_action = Ably::Models::PresenceMessage::ACTION.Leave fake_leave_presence_message = Ably::Models::PresenceMessage.new( - 'id' => "#{client_one.connection.id}-#{left_client_id}:0", + 'id' => "#{client_one.connection.id}:#{left_client_id}:0", 'clientId' => left_client_id, 'connectionId' => client_one.connection.id, 'timestamp' => as_since_epoch(Time.now), 'action' => leave_action ) @@ -674,39 +971,42 @@ end end end context '#get' do - context 'with :wait_for_sync option set to true' do - it 'waits until sync is complete', em_timeout: 30 do # allow for slow connections and lots of messages - enter_expected_count.times do |index| - EventMachine.add_timer(index / 10) do - presence_client_one.enter_client("client:#{index}") do |message| - entered << message - next unless entered.count == enter_expected_count + context 'by default' do + it 'waits until sync is complete (#RTP11c1)', em_timeout: 30 do # allow for slow connections and lots of messages + enter_expected_count.times do |indx| + EventMachine.add_timer(indx / 10) do + presence_client_one.enter_client "client:#{indx}" + end + end - presence_anonymous_client.get(wait_for_sync: true) do |members| - expect(members.map(&:client_id).uniq.count).to eql(enter_expected_count) - expect(members.count).to eql(enter_expected_count) - stop_reactor - end - end + presence_client_one.subscribe(:enter) do |message| + entered << message + next unless entered.count == enter_expected_count + + presence_anonymous_client.get do |members| + expect(members.map(&:client_id).uniq.count).to eql(enter_expected_count) + expect(members.count).to eql(enter_expected_count) + stop_reactor end end end end - context 'by default' do + context 'with :wait_for_sync option set to false (#RTP11c1)' do it 'it does not wait for sync', em_timeout: 30 do # allow for slow connections and lots of messages - enter_expected_count.times do |index| - EventMachine.add_timer(index / 10) do - presence_client_one.enter_client("client:#{index}") do |message| + enter_expected_count.times do |indx| + EventMachine.add_timer(indx / 10) do + presence_client_one.enter_client "client:#{indx}" + presence_client_one.subscribe(:enter) do |message| entered << message next unless entered.count == enter_expected_count channel_anonymous_client.attach do - presence_anonymous_client.get do |members| + presence_anonymous_client.get(wait_for_sync: false) do |members| expect(presence_anonymous_client.members).to_not be_in_sync expect(members.count).to eql(0) stop_reactor end end @@ -894,28 +1194,45 @@ end end context 'and sync is complete' do it 'does not cache members that have left' do - presence_client_one.enter enter_data do + enter_ack = false + + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + expect(presence_client_one.members).to be_in_sync expect(presence_client_one.members.send(:members).count).to eql(1) presence_client_one.leave data end + presence_client_one.enter(enter_data) do + enter_ack = true + end + presence_client_one.subscribe(:leave) do |presence_message| + presence_client_one.unsubscribe :leave expect(presence_message.data).to eql(data) expect(presence_client_one.members.send(:members).count).to eql(0) + expect(enter_ack).to eql(true) stop_reactor end end end end - it 'raises an exception if not entered' do - expect { channel_client_one.presence.leave }.to raise_error(Ably::Exceptions::Standard, /Unable to leave presence channel that is not entered/) - stop_reactor + it 'succeeds and does not emit an event (#RTP10d)' do + channel_client_one.presence.leave do + # allow enough time for leave event to (not) fire + EventMachine.add_timer(2) do + stop_reactor + end + end + channel_client_one.subscribe(:leave) do + raise "No leave event should fire" + end end it_should_behave_like 'a public presence method', :leave, :left, {}, enter_first: true end @@ -1191,32 +1508,79 @@ stop_reactor end end it 'catches exceptions in the provided method block' do - expect(presence_client_one.logger).to receive(:error).with(/Intentional exception/) do + expect(presence_client_one.logger).to receive(:error) do |*args, &block| + expect(args.concat([block ? block.call : nil]).join(',')).to match(/Intentional exception/) stop_reactor end presence_client_one.get { raise 'Intentional exception' } end - it 'raise an exception if the channel is detached' do + it 'implicitly attaches the channel (#RTP11b)' do + expect(channel_client_one).to be_initialized + presence_client_one.get do |members| + expect(channel_client_one).to be_attached + stop_reactor + end + end + + context 'when the channel is SUSPENDED' do + context 'with wait_for_sync: true' do + it 'results in an error with @code@ @91005@ and a @message@ stating that the presence state is out of sync (#RTP11d)' do + presence_client_one.enter do + channel_client_one.transition_state_machine! :suspended + presence_client_one.get(wait_for_sync: true).errback do |error| + expect(error.code).to eql(91005) + expect(error.message).to match(/presence state is out of sync/i) + stop_reactor + end + end + end + end + + context 'with wait_for_sync: false' do + it 'returns the current PresenceMap and does not wait for the channel to change to the ATTACHED state (#RTP11d)' do + presence_client_one.enter do + channel_client_one.transition_state_machine! :suspended + presence_client_one.get(wait_for_sync: false) do |members| + expect(channel_client_one).to be_suspended + stop_reactor + end + end + end + end + end + + it 'fails if the connection is DETACHED (#RTP11b)' do channel_client_one.attach do - channel_client_one.transition_state_machine :detaching - channel_client_one.once(:detached) do - expect { presence_client_one.get }.to raise_error Ably::Exceptions::InvalidStateChange, /Operation is not allowed when channel is in STATE.detached/i - stop_reactor + channel_client_one.detach do + presence_client_one.get.tap do |deferrable| + deferrable.callback { raise 'Get should not succeed' } + deferrable.errback do |error| + expect(error).to be_a(Ably::Exceptions::InvalidState) + expect(error.message).to match(/Operation is not allowed when channel is in STATE.Detached/) + stop_reactor + end + end end end end - it 'raise an exception if the channel is failed' do + it 'fails if the connection is FAILED (#RTP11b)' do channel_client_one.attach do channel_client_one.transition_state_machine :failed expect(channel_client_one.state).to eq(:failed) - expect { presence_client_one.get }.to raise_error Ably::Exceptions::InvalidStateChange, /Operation is not allowed when channel is in STATE.failed/i - stop_reactor + presence_client_one.get.tap do |deferrable| + deferrable.callback { raise 'Get should not succeed' } + deferrable.errback do |error| + expect(error).to be_a(Ably::Exceptions::InvalidState) + expect(error.message).to match(/Operation is not allowed when channel is in STATE.Failed/) + stop_reactor + end + end end end context 'during a sync', em_timeout: 30 do let(:pages) { 2 } @@ -1238,11 +1602,11 @@ end end end context 'when :wait_for_sync is true' do - it 'fails if the connection fails' do + it 'fails if the connection becomes FAILED (#RTP11b)' do when_all(*connect_members_deferrables) do channel_client_two.attach do client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :sync sync_pages_received << protocol_message @@ -1261,11 +1625,11 @@ end end end end - it 'fails if the channel is detached' do + it 'fails if the channel becomes detached (#RTP11b)' do when_all(*connect_members_deferrables) do channel_client_two.attach do client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :sync # prevent any more SYNC messages coming through @@ -1285,14 +1649,14 @@ end end end end - # skip 'it fails if the connection changes to failed state' - - it 'returns the current members on the channel' do - presence_client_one.enter do + it 'returns the current members on the channel (#RTP11a)' do + presence_client_one.enter + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter presence_client_one.get do |members| expect(members.count).to eq(1) expect(client_one.client_id).to_not be_nil @@ -1302,11 +1666,11 @@ stop_reactor end end end - it 'filters by connection_id option if provided' do + it 'filters by connection_id option if provided (#RTP11c3)' do presence_client_one.enter do presence_client_two.enter end presence_client_one.subscribe(:enter) do |presence_message| @@ -1324,11 +1688,11 @@ end end end end - it 'filters by client_id option if provided' do + it 'filters by client_id option if provided (#RTP11c2)' do presence_client_one.enter do presence_client_two.enter end presence_client_one.subscribe(:enter) do |presence_message| @@ -1348,43 +1712,75 @@ end end end end - it 'does not wait for SYNC to complete if :wait_for_sync option is false' do - presence_client_one.enter do + it 'does not wait for SYNC to complete if :wait_for_sync option is false (#RTP11c1)' do + presence_client_one.enter + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + presence_client_two.get(wait_for_sync: false) do |members| expect(members.count).to eql(0) stop_reactor end end end + it 'returns the list of members and waits for SYNC to complete by default (#RTP11a)' do + presence_client_one.enter + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + + presence_client_two.get do |members| + expect(members.count).to eql(1) + stop_reactor + end + end + end + context 'when a member enters and then leaves' do it 'has no members' do presence_client_one.enter do - presence_client_one.leave do - presence_client_one.get do |members| - expect(members.count).to eq(0) - stop_reactor - end + presence_client_one.leave + end + + presence_client_one.subscribe(:leave) do + presence_client_one.get do |members| + expect(members.count).to eq(0) + stop_reactor end end end end + context 'when a member enters and the presence map is updated' do + it 'adds the member as being :present (#RTP2d)' do + presence_client_one.enter + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + + presence_client_one.get do |members| + expect(members.count).to eq(1) + expect(members.first.action).to eq(:present) + stop_reactor + end + end + end + end + context 'with lots of members on different clients' do let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } let(:client_two) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) } let(:members_per_client) { 10 } let(:clients_entered) { Hash.new { |hash, key| hash[key] = 0 } } let(:total_members) { members_per_client * 2 } it 'returns a complete list of members on all clients' do - members_per_client.times do |index| - presence_client_one.enter_client("client_1:#{index}") - presence_client_two.enter_client("client_2:#{index}") + members_per_client.times do |indx| + presence_client_one.enter_client("client_1:#{indx}") + presence_client_two.enter_client("client_2:#{indx}") end presence_client_one.subscribe(:enter) do clients_entered[:client_one] += 1 end @@ -1467,11 +1863,13 @@ context 'with a callback that raises an exception' do let(:exception) { StandardError.new("Intentional error") } it 'logs the error and continues' do emitted_exception = false - expect(client_one.logger).to receive(:error).with(/#{exception.message}/) + expect(client_one.logger).to receive(:error) do |*args, &block| + expect(args.concat([block ? block.call : nil]).join(',')).to match(/#{exception.message}/) + end presence_client_one.subscribe do |presence_message| emitted_exception = true raise exception end presence_client_one.enter do @@ -1522,11 +1920,14 @@ end end context 'REST #get' do it 'returns current members' do - presence_client_one.enter(data_payload) do + presence_client_one.enter data_payload + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + members_page = channel_rest_client_one.presence.get this_member = members_page.items.first expect(this_member).to be_a(Ably::Models::PresenceMessage) expect(this_member.client_id).to eql(client_one.client_id) @@ -1536,11 +1937,14 @@ end end it 'returns no members once left' do presence_client_one.enter(data_payload) do - presence_client_one.leave do + presence_client_one.leave + presence_client_one.subscribe(:leave) do + presence_client_one.unsubscribe :leave + members_page = channel_rest_client_one.presence.get expect(members_page.items.count).to eql(0) stop_reactor end end @@ -1652,11 +2056,12 @@ end end context '#get' do it 'returns a list of members with decrypted data' do - encrypted_channel.presence.enter(data) do + encrypted_channel.presence.enter(data) + encrypted_channel.presence.subscribe(:enter) do encrypted_channel.presence.get do |members| member = members.first expect(member.encoding).to be_nil expect(member.data).to eql(data) stop_reactor @@ -1665,11 +2070,12 @@ end end context 'REST #get' do it 'returns a list of members with decrypted data' do - encrypted_channel.presence.enter(data) do + encrypted_channel.presence.enter(data) + encrypted_channel.presence.subscribe(:enter) do member = channel_rest_client_one.presence.get.items.first expect(member.encoding).to be_nil expect(member.data).to eql(data) stop_reactor end @@ -1694,13 +2100,12 @@ end end it 'emits an error when cipher does not match and presence data cannot be decoded' do incompatible_encrypted_channel.attach do - incompatible_encrypted_channel.on(:error) do |error| - expect(error).to be_a(Ably::Exceptions::CipherError) - expect(error.message).to match(/Cipher algorithm AES-128-CBC does not match/) + expect(client_two.logger).to receive(:error) do |*args, &block| + expect(args.concat([block ? block.call : nil]).join(',')).to match(/Cipher algorithm AES-128-CBC does not match/) stop_reactor end encrypted_channel.attach do encrypted_channel.presence.enter data @@ -1734,17 +2139,17 @@ end end end context 'connection failure mid-way through a large member sync' do - let(:members_count) { 250 } + let(:members_count) { 201 } let(:sync_pages_received) { [] } - let(:client_options) { default_options.merge(log_level: :error) } + let(:client_options) { default_options.merge(log_level: :fatal) } - it 'resumes the SYNC operation', em_timeout: 15 do - when_all(*members_count.times.map do |index| - presence_anonymous_client.enter_client("client:#{index}") + it 'resumes the SYNC operation (#RTP3)', em_timeout: 15 do + when_all(*members_count.times.map do |indx| + presence_anonymous_client.enter_client("client:#{indx}") end) do channel_client_two.attach do client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :sync sync_pages_received << protocol_message @@ -1755,9 +2160,659 @@ presence_client_two.get(wait_for_sync: true) do |members| expect(members.count).to eql(members_count) expect(members.map(&:member_key).uniq.count).to eql(members_count) stop_reactor + end + end + end + end + + context 'server-initiated sync' do + context 'with multiple SYNC pages' do + let(:present_action) { 1 } + let(:leave_action) { 3 } + let(:presence_sync_1) do + [ + { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: present_action }, + { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: present_action } + ] + end + let(:presence_sync_2) do + [ + { client_id: 'a', connection_id: 'one', id: 'one:1:0', action: leave_action } + ] + end + + it 'is initiated with a SYNC message and completed with a later SYNC message with no cursor value part of the channelSerial (#RTP18a, #RTP18b) ', em_timeout: 15 do + presence_anonymous_client.get do |members| + expect(members.length).to eql(0) + expect(presence_anonymous_client).to be_sync_complete + + presence_anonymous_client.subscribe(:present) do + expect(presence_anonymous_client).to_not be_sync_complete + presence_anonymous_client.get do |members| + expect(presence_anonymous_client).to be_sync_complete + expect(members.length).to eql(1) + expect(members.first.client_id).to eql('b') + stop_reactor + end + end + + ## Fabricate server-initiated SYNC in two parts + action = Ably::Models::ProtocolMessage::ACTION.Sync + sync_message = Ably::Models::ProtocolMessage.new( + action: action, + connection_serial: 10, + channel_serial: 'sequenceid:cursor', + channel: channel_name, + presence: presence_sync_1, + timestamp: Time.now.to_i * 1000 + ) + anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message + + sync_message = Ably::Models::ProtocolMessage.new( + action: action, + connection_serial: 11, + channel_serial: 'sequenceid:', # indicates SYNC is complete + channel: channel_name, + presence: presence_sync_2, + timestamp: Time.now.to_i * 1000 + ) + anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message + end + end + end + + context 'with a single SYNC page' do + let(:present_action) { 1 } + let(:leave_action) { 3 } + let(:presence_sync) do + [ + { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: present_action }, + { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: present_action }, + { client_id: 'a', connection_id: 'one', id: 'one:1:0', action: leave_action } + ] + end + + it 'is initiated and completed with a single SYNC message (and no channelSerial) (#RTP18a, #RTP18c) ', em_timeout: 15 do + presence_anonymous_client.get do |members| + expect(members.length).to eql(0) + expect(presence_anonymous_client).to be_sync_complete + + presence_anonymous_client.subscribe(:present) do + expect(presence_anonymous_client).to_not be_sync_complete + presence_anonymous_client.get do |members| + expect(presence_anonymous_client).to be_sync_complete + expect(members.length).to eql(1) + expect(members.first.client_id).to eql('b') + stop_reactor + end + end + + ## Fabricate server-initiated SYNC in two parts + action = Ably::Models::ProtocolMessage::ACTION.Sync + sync_message = Ably::Models::ProtocolMessage.new( + action: action, + connection_serial: 10, + channel: channel_name, + presence: presence_sync, + timestamp: Time.now.to_i * 1000 + ) + anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message + end + end + end + + context 'when members exist in the PresenceMap before a SYNC completes' do + let(:enter_action) { Ably::Models::PresenceMessage::ACTION.Enter.to_i } + let(:present_action) { Ably::Models::PresenceMessage::ACTION.Present.to_i } + let(:presence_sync_protocol_message) do + [ + { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: present_action }, + { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: present_action } + ] + end + let(:presence_enter_message) do + Ably::Models::PresenceMessage.new( + 'id' => "#{random_str}:#{random_str}:0", + 'clientId' => random_str, + 'connectionId' => random_str, + 'timestamp' => as_since_epoch(Time.now), + 'action' => enter_action + ) + end + + it 'removes the members that are no longer present (#RTP19)', em_timeout: 15 do + presence_anonymous_client.get do |members| + expect(members.length).to eql(0) + + # Now inject a fake member into the PresenceMap by faking the receive of a Presence message from Ably into the Presence object + presence_anonymous_client.__incoming_msgbus__.publish :presence, presence_enter_message + + EventMachine.next_tick do + presence_anonymous_client.get do |members| + expect(members.length).to eql(1) + expect(members.first.client_id).to eql(presence_enter_message.client_id) + + presence_events = [] + presence_anonymous_client.subscribe do |presence_message| + presence_events << [presence_message.client_id, presence_message.action.to_sym] + if presence_message.action == :leave + expect(presence_message.id).to be_nil + expect(presence_message.timestamp.to_f * 1000).to be_within(20).of(Time.now.to_f * 1000) + end + end + + ## Fabricate server-initiated SYNC in two parts + action = Ably::Models::ProtocolMessage::ACTION.Sync + sync_message = Ably::Models::ProtocolMessage.new( + action: action, + connection_serial: 10, + channel: channel_name, + presence: presence_sync_protocol_message, + timestamp: Time.now.to_i * 1000 + ) + anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message + + EventMachine.next_tick do + presence_anonymous_client.get do |members| + expect(members.length).to eql(2) + expect(members.find { |member| member.client_id == presence_enter_message.client_id}).to be_nil + expect(presence_events).to contain_exactly( + ['a', :present], + ['b', :present], + [presence_enter_message.client_id, :leave] + ) + stop_reactor + end + end + end + end + end + end + end + end + + context 'when the client does not have presence subscribe privileges but is present on the channel' do + let(:present_only_capability) do + { channel_name => ["presence"] } + end + let(:present_only_callback) { Proc.new { Ably::Rest::Client.new(client_options).auth.request_token(client_id: '*', capability: present_only_capability) } } + let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: present_only_callback)) } + + it 'receives presence updates for all presence events generated by the current connection and the presence map is kept up to date (#RTP17a)' do + skip 'This functionality is not yet in sandbox, see https://github.com/ably/realtime/issues/656' + + enter_client_ids = [] + presence_client_one.subscribe(:enter) do |presence_message| + enter_client_ids << presence_message.client_id + end + + leave_client_ids = [] + presence_client_one.subscribe(:leave) do + leave_client_ids << presence_message.client_id + end + + presence_client_one.enter_client 'bob' do + presence_client_one.enter_client 'sarah' + end + + entered_count = 0 + presence_client_one.subscribe(:enter) do + entered_count += 1 + next unless entered_count == 2 + + presence_client_one.unsubscribe :enter + presence_client_one.get do |members| + EventMachine.add_timer(1) do + expect(members.map(&:client_id)).to contain_exactly('bob', 'sarah') + expect(enter_client_ids).to contain_exactly('bob', 'sarah') + + presence_client_one.leave_client 'bob' do + presence_client_one.leave_client 'sarah' + end + + leave_count = 0 + presence_client_one.subscribe(:leave) do + leave_count += 1 + next unless leave_count == 2 + + presence_client_one.get do |members| + expect(members.length).to eql(0) + expect(leave_client_ids).to contain_exactly('bob', 'sarah') + stop_reactor + end + end + end + end + end + end + end + + context "local PresenceMap for presence members entered by this client" do + it "maintains a copy of the member map for any member that shares this connection's connection ID (#RTP17)" do + presence_client_one.enter do + presence_client_two.enter + end + + entered_count = 0 + presence_client_one.subscribe(:enter) do + entered_count += 1 + next unless entered_count == 2 + channel_anonymous_client.attach do + channel_anonymous_client.presence.get do |members| + expect(channel_anonymous_client.presence.members.local_members).to be_empty + expect(presence_client_one.members.local_members.length).to eql(1) + expect(presence_client_one.members.local_members.values.first.connection_id).to eql(client_one.connection.id) + expect(presence_client_two.members.local_members.values.first.connection_id).to eql(client_two.connection.id) + presence_client_two.leave + presence_client_two.subscribe(:leave) do + expect(presence_client_two.members.local_members).to be_empty + stop_reactor + end + end + end + end + end + + context 'when a channel becomes attached again' do + let(:attached_action) { Ably::Models::ProtocolMessage::ACTION.Attached.to_i } + let(:sync_action) { Ably::Models::ProtocolMessage::ACTION.Sync.to_i } + let(:presence_action) { Ably::Models::ProtocolMessage::ACTION.Presence.to_i } + let(:present_action) { Ably::Models::PresenceMessage::ACTION.Present.to_i } + let(:resume_flag) { 4 } + let(:presence_flag) { 1 } + + def fabricate_incoming_protocol_message(protocol_message) + client_one.connection.__incoming_protocol_msgbus__.publish :protocol_message, protocol_message + end + + # Prevents any messages from the WebSocket transport being sent / received + # Connection protocol message subscriptions are still active, but nothing reaches or comes from the WebSocket transport + def cripple_websocket_transport + client_one.connection.transport.__incoming_protocol_msgbus__.unsubscribe + client_one.connection.transport.__outgoing_protocol_msgbus__.unsubscribe + end + + context 'and the resume flag is true' do + context 'and the presence flag is false' do + it 'does not send any presence events as the PresenceMap is in sync (#RTP5c1)' do + presence_client_one.enter + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + + client_one.connection.transport.__outgoing_protocol_msgbus__.subscribe do |message| + raise "No presence state updates to Ably are expected. Message sent: #{message.to_json}" if client_one.connection.connected? + end + + cripple_websocket_transport + + fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( + action: attached_action, + channel: channel_name, + flags: resume_flag + ) + + EventMachine.add_timer(1) do + presence_client_one.get do |members| + expect(members.length).to eql(1) + expect(presence_client_one.members.local_members.length).to eql(1) + stop_reactor + end + end + end + end + end + + context 'and the presence flag is true' do + context 'and following the SYNC all local MemberMap members are present in the PresenceMap' do + it 'does nothing as MemberMap is in sync (#RTP5c2)' do + presence_client_one.enter + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + + expect(presence_client_one.members.length).to eql(1) + expect(presence_client_one.members.local_members.length).to eql(1) + + presence_client_one.members.once(:in_sync) do + presence_client_one.get do |members| + expect(members.length).to eql(1) + expect(presence_client_one.members.local_members.length).to eql(1) + stop_reactor + end + end + + client_one.connection.transport.__outgoing_protocol_msgbus__.subscribe do |message| + raise "No presence state updates to Ably are expected. Message sent: #{message.to_json}" if client_one.connection.connected? + end + + cripple_websocket_transport + + fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( + action: attached_action, + channel: channel_name, + flags: resume_flag + presence_flag + ) + + fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( + action: sync_action, + channel: channel_name, + presence: presence_client_one.members.map(&:shallow_clone).map(&:as_json), + channelSerial: nil # no further SYNC messages expected + ) + end + end + end + + context 'and following the SYNC a local MemberMap member is not present in the PresenceMap' do + it 're-enters the missing members automatically (#RTP5c2)' do + sync_check_completed = false + + presence_client_one.enter + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + + expect(presence_client_one.members.length).to eql(1) + expect(presence_client_one.members.local_members.length).to eql(1) + + client_one.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |message| + next if message.action == :close # ignore finalization of connection + + expect(message.action).to eq(:presence) + presence_message = message.presence.first + expect(presence_message.action).to eq(:enter) + expect(presence_message.client_id).to eq(client_one.auth.client_id) + + presence_client_one.subscribe(:enter) do |message| + expect(message.connection_id).to eql(client_one.connection.id) + expect(message.client_id).to eq(client_one.auth.client_id) + + EventMachine.next_tick do + expect(presence_client_one.members.length).to eql(2) + expect(presence_client_one.members.local_members.length).to eql(1) + expect(sync_check_completed).to be_truthy + stop_reactor + end + end + + # Fabricate Ably sending back the Enter PresenceMessage to the client a short while after + # ensuring the PresenceMap for a short period does not have this member as to be expected in reality + EventMachine.add_timer(0.2) do + connection_id = random_str + fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( + action: presence_action, + channel: channel_name, + connectionId: client_one.connection.id, + connectionSerial: 50, + timestamp: as_since_epoch(Time.now), + presence: [presence_message.shallow_clone(id: "#{client_one.connection.id}:0:0", timestamp: as_since_epoch(Time.now)).as_json] + ) + end + end + + presence_client_one.members.once(:in_sync) do + # For a brief period, the client will have re-entered the missing members from the local_members + # but the enter from Ably will have not been received, so at this point the local_members will be empty + presence_client_one.get do |members| + expect(members.length).to eql(1) + expect(members.first.connection_id).to_not eql(client_one.connection.id) + expect(presence_client_one.members.local_members.length).to eql(0) + sync_check_completed = true + end + end + + cripple_websocket_transport + + fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( + action: attached_action, + channel: channel_name, + flags: resume_flag + presence_flag + ) + + # Complete the SYNC but without the member who was entered by this client + connection_id = random_str + fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( + action: sync_action, + channel: channel_name, + timestamp: as_since_epoch(Time.now), + presence: [{ id: "#{connection_id}:0:0", action: present_action, connection_id: connection_id, client_id: random_str }], + chanenlSerial: nil # no further SYNC messages expected + ) + end + end + end + end + end + + context 'and the resume flag is false' do + context 'and the presence flag is false' do + let(:member_data) { random_str } + + it 'immediately resends all local presence members (#RTP5c2, #RTP19a)' do + in_sync_confirmed_no_local_members = false + local_member_leave_event_fired = false + + presence_client_one.enter(member_data) + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + + presence_client_one.subscribe(:leave) do |message| + # The local member will leave the PresenceMap due to the ATTACHED without Presence + local_member_leave_event_fired = true + end + + # Local members re-entered automatically appear as updates due to the + # fabricated ATTACHED message sent and the members already being present + presence_client_one.subscribe(:update) do |message| + expect(local_member_leave_event_fired).to be_truthy + expect(message.data).to eq(member_data) + expect(message.client_id).to eq(client_one.auth.client_id) + EventMachine.next_tick do + expect(presence_client_one.members.length).to eql(1) + expect(presence_client_one.members.local_members.length).to eql(1) + expect(in_sync_confirmed_no_local_members).to be_truthy + stop_reactor + end + end + + presence_client_one.members.once(:in_sync) do + # Immediately after SYNC (no sync actually occurred, but this event fires immediately after a channel SYNCs or is not expecting to SYNC) + expect(presence_client_one.members.length).to eql(0) + expect(presence_client_one.members.local_members.length).to eql(0) + in_sync_confirmed_no_local_members = true + end + + # ATTACHED ProtocolMessage with no presence flag will clear the presence set immediately, #RTP19a + fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( + action: attached_action, + channel: channel_name, + flags: 0 # no resume or presence flag + ) + end + end + end + end + + context 'when re-entering a client automatically, if the re-enter fails for any reason' do + let(:client_one_options) do + client_options.merge(client_id: client_one_id, log_level: :error) + end + let(:client_one) { auto_close Ably::Realtime::Client.new(client_one_options) } + + it 'should emit an ErrorInfo with error code 91004 (#RTP5c3)' do + presence_client_one.enter + + # Wait for client to be entered + presence_client_one.subscribe(:enter) do + # Local member should not be re-entered as the request to re-enter will timeout + presence_client_one.subscribe(:update) do |message| + raise "Unexpected update, this should not happen as the re-enter fails" + end + + channel_client_one.on(:update) do |channel_state_change| + next if channel_state_change.reason.nil? # first update is generated by the fabricated ATTACHED + + expect(channel_state_change.reason.code).to eql(91004) + expect(channel_state_change.reason.message).to match(/#{client_one_id}/) + expect(channel_state_change.reason.message).to match(/Fabricated/) # fabricated message + expect(channel_state_change.reason.message).to match(/2345/) # fabricated error code + stop_reactor + end + + cripple_websocket_transport + + client_one.connection.__outgoing_protocol_msgbus__.subscribe do |protocol_message| + if protocol_message.action == :presence + # Fabricate a NACK for the re-enter message + EventMachine.add_timer(0.1) do + fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( + action: Ably::Models::ProtocolMessage::ACTION.Nack.to_i , + channel: channel_name, + count: 1, + msg_serial: protocol_message.message_serial, + error: { + message: 'Fabricated', + code: 2345 + } + ) + end + end + end + + # ATTACHED ProtocolMessage with no presence flag will clear the presence set immediately, #RTP19a + fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new( + action: attached_action, + channel: channel_name, + flags: 0 # no resume or presence flag + ) + end + end + end + end + end + + context 'channel state side effects' do + context 'channel transitions to the FAILED state' do + let(:anonymous_client) { auto_close Ably::Realtime::Client.new(client_options.merge(log_level: :fatal)) } + let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(client_id: client_one_id, log_level: :fatal)) } + + it 'clears the PresenceMap and local member map copy and does not emit any presence events (#RTP5a)' do + presence_client_one.enter + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + + channel_anonymous_client.attach do + presence_anonymous_client.get do |members| + expect(members.count).to eq(1) + + presence_anonymous_client.subscribe { raise 'No presence events should be emitted' } + channel_anonymous_client.transition_state_machine! :failed, reason: RuntimeError.new + expect(presence_anonymous_client.members.length).to eq(0) + expect(channel_anonymous_client).to be_failed + presence_anonymous_client.unsubscribe # prevent events being sent to the channel from Ably as it is unaware it's FAILED + + expect(presence_client_one.members.local_members.count).to eq(1) + channel_client_one.transition_state_machine! :failed + expect(channel_client_one).to be_failed + expect(presence_client_one.members.local_members.count).to eq(0) + stop_reactor + end + end + end + end + end + + context 'channel transitions to the DETACHED state' do + it 'clears the PresenceMap and local member map copy and does not emit any presence events (#RTP5a)' do + presence_client_one.enter + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter + + channel_anonymous_client.attach do + presence_anonymous_client.get do |members| + expect(members.count).to eq(1) + + presence_anonymous_client.subscribe { raise 'No presence events should be emitted' } + channel_anonymous_client.detach do + expect(presence_anonymous_client.members.length).to eq(0) + expect(channel_anonymous_client).to be_detached + + expect(presence_client_one.members.local_members.count).to eq(1) + channel_client_one.detach do + expect(presence_client_one.members.local_members.count).to eq(0) + stop_reactor + end + end + end + end + end + end + end + + context 'channel transitions to the SUSPENDED state' do + let(:auth_callback) do + Proc.new do + # Pause to allow presence updates to occur whilst disconnected + sleep 1 + Ably::Rest::Client.new(client_options).auth.request_token + end + end + let(:anonymous_client) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: auth_callback)) } + + it 'maintains the PresenceMap and only publishes presence event changes since the last attached state (#RTP5f)' do + presence_client_one.enter do + presence_client_two.enter + end + + entered_count = 0 + presence_client_one.subscribe(:enter) do + entered_count += 1 + next unless entered_count == 2 + + presence_client_one.unsubscribe :enter + channel_anonymous_client.attach do + presence_anonymous_client.get do |members| + expect(members.count).to eq(2) + + received_events = [] + presence_anonymous_client.subscribe do |presence_message| + expect(presence_message.action).to eq(:leave) + expect(presence_message.client_id).to eql(client_one_id) + received_events << [:leave, presence_message.client_id] + end + + # Kill the connection triggering an automatic reconnect and reattach of the channel that is about to put into the suspended state + anonymous_client.connection.transport.close_connection_after_writing + + # Prevent the same connection resuming, we want a new connection and the channel SYNC to be sent + anonymous_client.connection.reset_resume_info + + anonymous_client.connection.once(:disconnected) do + # Move to the SUSPENDED state and check presence map intact + channel_anonymous_client.transition_state_machine! :suspended + + # Change the presence map state on that channel by getting one member to leave whilst the connection for anonymous client is diconnected + presence_client_one.leave + + # Whilst SUSPENDED and DISCONNECTED, a get of the PresenceMap should still reveal two members + presence_anonymous_client.get(wait_for_sync: false) do |members| + expect(members.count).to eq(2) + + channel_anonymous_client.once(:attached) do + presence_anonymous_client.get do |members| + expect(members.count).to eq(1) + EventMachine.add_timer(0.5) do + expect(received_events).to contain_exactly([:leave, client_one_id]) + presence_anonymous_client.unsubscribe + stop_reactor + end + end + end + end + end + end + end end end end end end