lib/submodules/ably-ruby/spec/acceptance/realtime/presence_spec.rb in ably-rest-0.9.3 vs lib/submodules/ably-ruby/spec/acceptance/realtime/presence_spec.rb in ably-rest-1.0.0
- old
+ new
@@ -40,13 +40,22 @@
end
end
def setup_test(method_name, args, options)
if options[:enter_first]
+ acked = false
+ received = false
presence_client_one.public_send(method_name.to_s.gsub(/leave|update/, 'enter'), args) do
- yield
+ acked = true
+ yield if acked & received
end
+ presence_client_one.subscribe do |presence_message|
+ expect(presence_message.action).to eq(:enter)
+ presence_client_one.unsubscribe
+ received = true
+ yield if acked & received
+ end
else
yield
end
end
@@ -54,28 +63,72 @@
it 'raise an exception if the channel is detached' do
setup_test(method_name, args, options) do
channel_client_one.attach do
channel_client_one.transition_state_machine :detaching
channel_client_one.once(:detached) do
- expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::InvalidStateChange, /Operation is not allowed when channel is in STATE.detached/i
- stop_reactor
+ presence_client_one.public_send(method_name, args).tap do |deferrable|
+ deferrable.callback { raise 'Get should not succeed' }
+ deferrable.errback do |error|
+ expect(error).to be_a(Ably::Exceptions::InvalidState)
+ expect(error.message).to match(/Operation is not allowed when channel is in STATE.Detached/)
+ stop_reactor
+ end
+ end
end
end
end
end
+ it 'raise an exception if the channel becomes detached' do
+ setup_test(method_name, args, options) do
+ channel_client_one.attach do
+ channel_client_one.transition_state_machine :detaching
+ presence_client_one.public_send(method_name, args).tap do |deferrable|
+ deferrable.callback { raise 'Get should not succeed' }
+ deferrable.errback do |error|
+ expect(error).to be_a(Ably::Exceptions::InvalidState)
+ expect(error.message).to match(/Operation failed as channel transitioned to STATE.Detached/)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+
it 'raise an exception if the channel is failed' do
setup_test(method_name, args, options) do
channel_client_one.attach do
channel_client_one.transition_state_machine :failed
expect(channel_client_one.state).to eq(:failed)
- expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::InvalidStateChange, /Operation is not allowed when channel is in STATE.failed/i
- stop_reactor
+ presence_client_one.public_send(method_name, args).tap do |deferrable|
+ deferrable.callback { raise 'Get should not succeed' }
+ deferrable.errback do |error|
+ expect(error).to be_a(Ably::Exceptions::InvalidState)
+ expect(error.message).to match(/Operation is not allowed when channel is in STATE.Failed/)
+ stop_reactor
+ end
+ end
end
end
end
+ it 'raise an exception if the channel becomes failed' do
+ setup_test(method_name, args, options) do
+ channel_client_one.attach do
+ presence_client_one.public_send(method_name, args).tap do |deferrable|
+ deferrable.callback { raise 'Get should not succeed' }
+ deferrable.errback do |error|
+ expect(error).to be_a(Ably::Exceptions::MessageDeliveryFailed)
+ stop_reactor
+ end
+ end
+ channel_client_one.transition_state_machine :failed
+ expect(channel_client_one.state).to eq(:failed)
+ end
+ end
+ end
+
it 'implicitly attaches the channel' do
expect(channel_client_one).to_not be_attached
presence_client_one.public_send(method_name, args) do
expect(channel_client_one).to be_attached
stop_reactor
@@ -84,37 +137,43 @@
context 'when :queue_messages client option is false' do
let(:client_one) { auto_close Ably::Realtime::Client.new(default_options.merge(queue_messages: false, client_id: client_id)) }
context 'and connection state initialized' do
- it 'raises an exception' do
- expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::MessageQueueingDisabled
+ it 'fails the deferrable' do
+ presence_client_one.public_send(method_name, args).errback do |error|
+ expect(error).to be_a(Ably::Exceptions::MessageQueueingDisabled)
+ stop_reactor
+ end
expect(client_one.connection).to be_initialized
- stop_reactor
end
end
context 'and connection state connecting' do
- it 'raises an exception' do
+ it 'fails the deferrable' do
client_one.connect
EventMachine.next_tick do
- expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::MessageQueueingDisabled
+ presence_client_one.public_send(method_name, args).errback do |error|
+ expect(error).to be_a(Ably::Exceptions::MessageQueueingDisabled)
+ stop_reactor
+ end
expect(client_one.connection).to be_connecting
- stop_reactor
end
end
end
context 'and connection state disconnected' do
let(:client_one) { auto_close Ably::Realtime::Client.new(default_options.merge(queue_messages: false, client_id: client_id, :log_level => :error)) }
- it 'raises an exception' do
+ it 'fails the deferrable' do
client_one.connection.once(:connected) do
client_one.connection.once(:disconnected) do
- expect { presence_client_one.public_send(method_name, args) }.to raise_error Ably::Exceptions::MessageQueueingDisabled
+ presence_client_one.public_send(method_name, args).errback do |error|
+ expect(error).to be_a(Ably::Exceptions::MessageQueueingDisabled)
+ stop_reactor
+ end
expect(client_one.connection).to be_disconnected
- stop_reactor
end
client_one.connection.transition_state_machine :disconnected
end
end
end
@@ -256,11 +315,12 @@
end
end
it 'catches exceptions in the provided method block and logs them to the logger' do
setup_test(method_name, args, options) do
- expect(presence_client_one.logger).to receive(:error).with(/Intentional exception/) do
+ expect(presence_client_one.logger).to receive(:error) do |*args, &block|
+ expect(args.concat([block ? block.call : nil]).join(',')).to match(/Intentional exception/)
stop_reactor
end
presence_client_one.public_send(method_name, args) { raise 'Intentional exception' }
end
end
@@ -319,10 +379,17 @@
it 'throws an exception' do
expect { presence_channel.public_send(method_name, nil) }.to raise_error Ably::Exceptions::IncompatibleClientId
stop_reactor
end
end
+
+ context 'and a client_id that is not a string type' do
+ it 'throws an exception' do
+ expect { presence_channel.public_send(method_name, 1) }.to raise_error Ably::Exceptions::IncompatibleClientId
+ stop_reactor
+ end
+ end
end
context ":#{method_name} when authenticated with a valid client_id" do
let(:token) { Ably::Rest::Client.new(default_options).auth.request_token(client_id: 'valid').token }
let(:client_options) { default_options.merge(key: nil, token: token) }
@@ -407,18 +474,18 @@
end
end
end
context 'when attached (but not present) on a presence channel with an anonymous client (no client ID)' do
- it 'maintains state as other clients enter and leave the channel' do
+ it 'maintains state as other clients enter and leave the channel (#RTP2e)' do
channel_anonymous_client.attach do
presence_anonymous_client.subscribe(:enter) do |presence_message|
expect(presence_message.client_id).to eql(client_one.client_id)
presence_anonymous_client.get do |members|
expect(members.first.client_id).to eql(client_one.client_id)
- expect(members.first.action).to eq(:enter)
+ expect(members.first.action).to eq(:present)
presence_anonymous_client.subscribe(:leave) do |leave_presence_message|
expect(leave_presence_message.client_id).to eql(client_one.client_id)
presence_anonymous_client.get do |members_once_left|
@@ -434,11 +501,11 @@
presence_client_one.leave
end
end
end
- context '#members map', api_private: true do
+ context '#members map / PresenceMap (#RTP2)', api_private: true do
it 'is available once the channel is created' do
expect(presence_anonymous_client.members).to_not be_nil
stop_reactor
end
@@ -466,11 +533,18 @@
end
end
context 'once server sync is complete' do
it 'behaves like an Enumerable allowing direct access to current members' do
- when_all(presence_client_one.enter, presence_client_two.enter) do
+ presence_client_one.enter
+ presence_client_two.enter
+
+ entered = 0
+ presence_client_one.subscribe(:enter) do
+ entered += 1
+ next unless entered == 2
+
presence_anonymous_client.members.once(:in_sync) do
expect(presence_anonymous_client.members.count).to eql(2)
member_ids = presence_anonymous_client.members.map(&:member_key)
expect(member_ids.count).to eql(2)
expect(member_ids.uniq.count).to eql(2)
@@ -479,50 +553,224 @@
channel_anonymous_client.attach
end
end
end
+
+ context 'the map is based on the member_key (connection_id & client_id)' do
+ # 2 unqiue client_id with client_id "b" being on two connections
+ let(:enter_action) { 2 }
+ let(:presence_data) do
+ [
+ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action },
+ { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: enter_action },
+ { client_id: 'a', connection_id: 'one', id: 'one:0:2', action: enter_action },
+ { client_id: 'b', connection_id: 'one', id: 'one:0:3', action: enter_action },
+ { client_id: 'b', connection_id: 'two', id: 'two:0:4', action: enter_action }
+ ]
+ end
+
+ it 'ensures uniqueness from this member_key (#RTP2a)' do
+ channel_anonymous_client.attach do
+ presence_anonymous_client.get do |members|
+ expect(members.length).to eql(0)
+
+ ## Fabricate members
+ action = Ably::Models::ProtocolMessage::ACTION.Presence
+ presence_msg = Ably::Models::ProtocolMessage.new(
+ action: action,
+ connection_serial: 20,
+ channel: channel_name,
+ presence: presence_data,
+ timestamp: Time.now.to_i * 1000
+ )
+ anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, presence_msg
+
+ EventMachine.add_timer(0.5) do
+ presence_anonymous_client.get do |members|
+ expect(members.length).to eql(3)
+ expect(members.map { |member| member.client_id }.uniq).to contain_exactly('a', 'b')
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ end
+
+ context 'newness is compared based on PresenceMessage#id unless synthesized' do
+ let(:page_size) { 100 }
+ let(:enter_expected_count) { page_size + 1 } # 100 per page, this ensures we have more than one page so that we can test newness during sync
+ let(:enter_action) { 2 }
+ let(:leave_action) { 3 }
+ let(:now) { Time.now.to_i * 1000 }
+ let(:entered) { [] }
+ let(:client_one) { auto_close Ably::Realtime::Client.new(default_options.merge(auth_callback: wildcard_token)) }
+
+ def setup_members_on(presence)
+ enter_expected_count.times do |indx|
+ # 10 messages per second max rate on simulation accounts
+ rate = indx.to_f / 10
+ EventMachine.add_timer(rate) do
+ presence.enter_client("client:#{indx}") do |message|
+ entered << message
+ next unless entered.count == enter_expected_count
+ yield
+ end
+ end
+ end
+ end
+
+ def allow_sync_fabricate_data_final_sync_and_assert_members
+ setup_members_on(presence_client_one) do
+ sync_pages_received = []
+ anonymous_client.connection.once(:connected) do
+ anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :sync
+ sync_pages_received << protocol_message
+ if sync_pages_received.count == 1
+ action = Ably::Models::ProtocolMessage::ACTION.Presence
+ presence_msg = Ably::Models::ProtocolMessage.new(
+ action: action,
+ connection_serial: anonymous_client.connection.serial + 1,
+ channel: channel_name,
+ presence: presence_data,
+ timestamp: Time.now.to_i * 1000
+ )
+ anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, presence_msg
+
+ # Now simulate an end to the sync
+ action = Ably::Models::ProtocolMessage::ACTION.Sync
+ sync_msg = Ably::Models::ProtocolMessage.new(
+ action: action,
+ connection_serial: anonymous_client.connection.serial + 2,
+ channel: channel_name,
+ channel_serial: 'validserialprefix:', # with no part after the `:` this indicates the end to the SYNC
+ presence: [],
+ timestamp: Time.now.to_i * 1000
+ )
+ anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_msg
+
+ # Stop the next SYNC arriving
+ anonymous_client.connection.__incoming_protocol_msgbus__.unsubscribe
+ end
+ end
+ end
+
+ presence_anonymous_client.get do |members|
+ expect(members.length).to eql(page_size + 2)
+ expect(members.find { |member| member.client_id == 'a' }).to be_nil
+ expect(members.find { |member| member.client_id == 'b' }.timestamp.to_i).to eql(now / 1000)
+ expect(members.find { |member| member.client_id == 'c' }.timestamp.to_i).to eql(now / 1000)
+ stop_reactor
+ end
+ end
+ end
+ end
+
+ context 'when presence messages are synthesized' do
+ let(:presence_data) do
+ [
+ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action, timestamp: now }, # first messages from client, second fabricated
+ { client_id: 'a', connection_id: 'one', id: 'fabricated:0:1', action: leave_action, timestamp: now + 1 }, # leave after enter based on timestamp
+ { client_id: 'b', connection_id: 'one', id: 'one:0:2', action: enter_action, timestamp: now }, # first messages from client, second fabricated
+ { client_id: 'b', connection_id: 'one', id: 'fabricated:0:3', action: leave_action, timestamp: now - 1 }, # leave before enter based on timestamp
+ { client_id: 'c', connection_id: 'one', id: 'fabricated:0:4', action: enter_action, timestamp: now }, # both messages fabricated
+ { client_id: 'c', connection_id: 'one', id: 'fabricated:0:5', action: leave_action, timestamp: now - 1 } # leave before enter based on timestamp
+ ]
+ end
+
+ it 'compares based on timestamp if either has a connectionId not part of the presence message id (#RTP2b1)' do
+ allow_sync_fabricate_data_final_sync_and_assert_members
+ end
+ end
+
+ context 'when presence messages are not synthesized (events sent from clients)' do
+ let(:presence_data) do
+ [
+ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: enter_action, timestamp: now }, # first messages from client
+ { client_id: 'a', connection_id: 'one', id: 'one:1:0', action: leave_action, timestamp: now - 1 }, # leave after enter based on msgSerial part of ID
+ { client_id: 'b', connection_id: 'one', id: 'one:2:2', action: enter_action, timestamp: now }, # first messages from client
+ { client_id: 'b', connection_id: 'one', id: 'one:2:1', action: leave_action, timestamp: now + 1 }, # leave before enter based on index part of ID
+ { client_id: 'c', connection_id: 'one', id: 'one:4:4', action: enter_action, timestamp: now }, # first messages from client
+ { client_id: 'c', connection_id: 'one', id: 'one:3:5', action: leave_action, timestamp: now + 1 } # leave before enter based on msgSerial part of ID
+ ]
+ end
+
+ it 'compares based on timestamp if either has a connectionId not part of the presence message id (#RTP2b2)' do
+ allow_sync_fabricate_data_final_sync_and_assert_members
+ end
+ end
+ end
end
- context '#sync_complete?' do
+ context '#sync_complete? and SYNC flags (#RTP1)' do
context 'when attaching to a channel without any members present' do
- it 'is true and the presence channel is considered synced immediately' do
+ it 'sync_complete? is true, there is no presence flag, and the presence channel is considered synced immediately (#RTP1)' do
+ flag_checked = false
+
+ anonymous_client.connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :attached
+ flag_checked = true
+ expect(protocol_message.has_presence_flag?).to eql(false)
+ end
+ end
+
channel_anonymous_client.attach do
expect(channel_anonymous_client.presence).to be_sync_complete
- stop_reactor
+ EventMachine.next_tick do
+ expect(flag_checked).to eql(true)
+ stop_reactor
+ end
end
end
end
context 'when attaching to a channel with members present' do
- it 'is false and the presence channel will subsequently be synced' do
- presence_client_one.enter do
+ it 'sync_complete? is false, there is a presence flag, and the presence channel is subsequently synced (#RTP1)' do
+ flag_checked = false
+
+ anonymous_client.connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :attached
+ flag_checked = true
+ expect(protocol_message.has_presence_flag?).to eql(true)
+ end
+ end
+
+ presence_client_one.enter
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
channel_anonymous_client.attach do
expect(channel_anonymous_client.presence).to_not be_sync_complete
- channel_anonymous_client.presence.get(wait_for_sync: true) do
+ channel_anonymous_client.presence.get do
expect(channel_anonymous_client.presence).to be_sync_complete
- stop_reactor
+ EventMachine.next_tick do
+ expect(flag_checked).to eql(true)
+ stop_reactor
+ end
end
end
end
end
end
end
- context '250 existing (present) members on a channel (3 SYNC pages)' do
- context 'requires at least 3 SYNC ProtocolMessages', em_timeout: 30 do
- let(:enter_expected_count) { 250 }
+ context '101 existing (present) members on a channel (2 SYNC pages)' do
+ context 'requiring at least 2 SYNC ProtocolMessages', em_timeout: 40 do
+ let(:enter_expected_count) { 101 }
let(:present) { [] }
let(:entered) { [] }
let(:sync_pages_received) { [] }
let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) }
def setup_members_on(presence)
- enter_expected_count.times do |index|
+ enter_expected_count.times do |indx|
# 10 messages per second max rate on simulation accounts
- EventMachine.add_timer(index / 10) do
- presence.enter_client("client:#{index}") do |message|
+ rate = indx.to_f / 10
+ EventMachine.add_timer(rate) do
+ presence.enter_client("client:#{indx}") do |message|
entered << message
next unless entered.count == enter_expected_count
yield
end
end
@@ -541,12 +789,51 @@
stop_reactor
end
end
end
+ context 'and a member enters before the SYNC operation is complete' do
+ let(:enter_client_id) { random_str }
+
+ it 'emits a :enter immediately and the member is :present once the sync is complete (#RTP2g)' do
+ setup_members_on(presence_client_one) do
+ member_entered = false
+
+ anonymous_client.connect do
+ presence_anonymous_client.subscribe(:enter) do |member|
+ expect(member.client_id).to eql(enter_client_id)
+ member_entered = true
+ end
+
+ presence_anonymous_client.get do |members|
+ expect(members.find { |member| member.client_id == enter_client_id }.action).to eq(:present)
+ stop_reactor
+ end
+
+ anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :sync
+ sync_pages_received << protocol_message
+ if sync_pages_received.count == 1
+ enter_action = Ably::Models::PresenceMessage::ACTION.Enter
+ enter_member = Ably::Models::PresenceMessage.new(
+ 'id' => "#{client_one.connection.id}:#{random_str}:0",
+ 'clientId' => enter_client_id,
+ 'connectionId' => client_one.connection.id,
+ 'timestamp' => as_since_epoch(Time.now),
+ 'action' => enter_action
+ )
+ presence_anonymous_client.__incoming_msgbus__.publish :presence, enter_member
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+
context 'and a member leaves before the SYNC operation is complete' do
- it 'emits :leave immediately as the member leaves' do
+ it 'emits :leave immediately as the member leaves and cleans up the ABSENT member after (#RTP2f, #RTP2g)' do
all_client_ids = enter_expected_count.times.map { |id| "client:#{id}" }
setup_members_on(presence_client_one) do
leave_member = nil
@@ -556,11 +843,18 @@
end
presence_anonymous_client.subscribe(:leave) do |leave_message|
expect(leave_message.client_id).to eql(leave_member.client_id)
expect(present.count).to be < enter_expected_count
- EventMachine.add_timer(1) do
+
+ # Hacky accessing a private method, but absent members are intentionally not exposed to any public APIs
+ expect(presence_anonymous_client.members.send(:absent_members).length).to eql(1)
+
+ presence_anonymous_client.members.once(:in_sync) do
+ # Check that members count is exact indicating the members with LEAVE action after sync are removed
+ expect(presence_anonymous_client).to be_sync_complete
+ expect(presence_anonymous_client.members.length).to eql(enter_expected_count - 1)
presence_anonymous_client.unsubscribe
stop_reactor
end
end
@@ -569,11 +863,11 @@
if protocol_message.action == :sync
sync_pages_received << protocol_message
if sync_pages_received.count == 1
leave_action = Ably::Models::PresenceMessage::ACTION.Leave
leave_member = Ably::Models::PresenceMessage.new(
- 'id' => "#{client_one.connection.id}-#{all_client_ids.first}:0",
+ 'id' => "#{client_one.connection.id}:#{all_client_ids.first}:0",
'clientId' => all_client_ids.first,
'connectionId' => client_one.connection.id,
'timestamp' => as_since_epoch(Time.now),
'action' => leave_action
)
@@ -583,53 +877,56 @@
end
end
end
end
- it 'ignores presence events with timestamps prior to the current :present event in the MembersMap' do
+ it 'ignores presence events with timestamps / identifiers prior to the current :present event in the MembersMap (#RTP2c)' do
started_at = Time.now
setup_members_on(presence_client_one) do
leave_member = nil
presence_anonymous_client.subscribe(:present) do |present_message|
present << present_message
- leave_member = present_message unless leave_member
if present.count == enter_expected_count
presence_anonymous_client.get do |members|
- expect(members.find { |member| member.client_id == leave_member.client_id}.action).to eq(:present)
+ member = members.find { |member| member.client_id == leave_member.client_id}
+ expect(member).to_not be_nil
+ expect(member.action).to eq(:present)
EventMachine.add_timer(1) do
presence_anonymous_client.unsubscribe
stop_reactor
end
end
end
end
presence_anonymous_client.subscribe(:leave) do |leave_message|
- raise 'Leave event should not have been fired because it is out of date'
+ raise "Leave event for #{leave_message} should not have been fired because it is out of date"
end
anonymous_client.connect do
anonymous_client.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
if protocol_message.action == :sync
sync_pages_received << protocol_message
if sync_pages_received.count == 1
+ first_member = protocol_message.presence[0] # get the first member in the SYNC set
leave_action = Ably::Models::PresenceMessage::ACTION.Leave
leave_member = Ably::Models::PresenceMessage.new(
- leave_member.as_json.merge('action' => leave_action, 'timestamp' => as_since_epoch(started_at))
+ first_member.as_json.merge('action' => leave_action, 'timestamp' => as_since_epoch(started_at))
)
+ # After the SYNC has started, no inject that member has having left with a timestamp before the sync
presence_anonymous_client.__incoming_msgbus__.publish :presence, leave_member
end
end
end
end
end
end
- it 'does not emit :present after the :leave event has been emitted, and that member is not included in the list of members via #get with :wait_for_sync' do
+ it 'does not emit :present after the :leave event has been emitted, and that member is not included in the list of members via #get (#RTP2f)' do
left_client = 10
left_client_id = "client:#{left_client}"
setup_members_on(presence_client_one) do
member_left_emitted = false
@@ -647,11 +944,11 @@
end
expect(leave_message.client_id).to eql(left_client_id)
member_left_emitted = true
end
- presence_anonymous_client.get(wait_for_sync: true) do |members|
+ presence_anonymous_client.get do |members|
expect(members.count).to eql(enter_expected_count - 1)
expect(member_left_emitted).to eql(true)
expect(members.map(&:client_id)).to_not include(left_client_id)
EventMachine.add_timer(1) do
presence_anonymous_client.unsubscribe
@@ -660,11 +957,11 @@
end
channel_anonymous_client.attach do
leave_action = Ably::Models::PresenceMessage::ACTION.Leave
fake_leave_presence_message = Ably::Models::PresenceMessage.new(
- 'id' => "#{client_one.connection.id}-#{left_client_id}:0",
+ 'id' => "#{client_one.connection.id}:#{left_client_id}:0",
'clientId' => left_client_id,
'connectionId' => client_one.connection.id,
'timestamp' => as_since_epoch(Time.now),
'action' => leave_action
)
@@ -674,39 +971,42 @@
end
end
end
context '#get' do
- context 'with :wait_for_sync option set to true' do
- it 'waits until sync is complete', em_timeout: 30 do # allow for slow connections and lots of messages
- enter_expected_count.times do |index|
- EventMachine.add_timer(index / 10) do
- presence_client_one.enter_client("client:#{index}") do |message|
- entered << message
- next unless entered.count == enter_expected_count
+ context 'by default' do
+ it 'waits until sync is complete (#RTP11c1)', em_timeout: 30 do # allow for slow connections and lots of messages
+ enter_expected_count.times do |indx|
+ EventMachine.add_timer(indx / 10) do
+ presence_client_one.enter_client "client:#{indx}"
+ end
+ end
- presence_anonymous_client.get(wait_for_sync: true) do |members|
- expect(members.map(&:client_id).uniq.count).to eql(enter_expected_count)
- expect(members.count).to eql(enter_expected_count)
- stop_reactor
- end
- end
+ presence_client_one.subscribe(:enter) do |message|
+ entered << message
+ next unless entered.count == enter_expected_count
+
+ presence_anonymous_client.get do |members|
+ expect(members.map(&:client_id).uniq.count).to eql(enter_expected_count)
+ expect(members.count).to eql(enter_expected_count)
+ stop_reactor
end
end
end
end
- context 'by default' do
+ context 'with :wait_for_sync option set to false (#RTP11c1)' do
it 'it does not wait for sync', em_timeout: 30 do # allow for slow connections and lots of messages
- enter_expected_count.times do |index|
- EventMachine.add_timer(index / 10) do
- presence_client_one.enter_client("client:#{index}") do |message|
+ enter_expected_count.times do |indx|
+ EventMachine.add_timer(indx / 10) do
+ presence_client_one.enter_client "client:#{indx}"
+ presence_client_one.subscribe(:enter) do |message|
entered << message
next unless entered.count == enter_expected_count
channel_anonymous_client.attach do
- presence_anonymous_client.get do |members|
+ presence_anonymous_client.get(wait_for_sync: false) do |members|
expect(presence_anonymous_client.members).to_not be_in_sync
expect(members.count).to eql(0)
stop_reactor
end
end
@@ -894,28 +1194,45 @@
end
end
context 'and sync is complete' do
it 'does not cache members that have left' do
- presence_client_one.enter enter_data do
+ enter_ack = false
+
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
expect(presence_client_one.members).to be_in_sync
expect(presence_client_one.members.send(:members).count).to eql(1)
presence_client_one.leave data
end
+ presence_client_one.enter(enter_data) do
+ enter_ack = true
+ end
+
presence_client_one.subscribe(:leave) do |presence_message|
+ presence_client_one.unsubscribe :leave
expect(presence_message.data).to eql(data)
expect(presence_client_one.members.send(:members).count).to eql(0)
+ expect(enter_ack).to eql(true)
stop_reactor
end
end
end
end
- it 'raises an exception if not entered' do
- expect { channel_client_one.presence.leave }.to raise_error(Ably::Exceptions::Standard, /Unable to leave presence channel that is not entered/)
- stop_reactor
+ it 'succeeds and does not emit an event (#RTP10d)' do
+ channel_client_one.presence.leave do
+ # allow enough time for leave event to (not) fire
+ EventMachine.add_timer(2) do
+ stop_reactor
+ end
+ end
+ channel_client_one.subscribe(:leave) do
+ raise "No leave event should fire"
+ end
end
it_should_behave_like 'a public presence method', :leave, :left, {}, enter_first: true
end
@@ -1191,32 +1508,79 @@
stop_reactor
end
end
it 'catches exceptions in the provided method block' do
- expect(presence_client_one.logger).to receive(:error).with(/Intentional exception/) do
+ expect(presence_client_one.logger).to receive(:error) do |*args, &block|
+ expect(args.concat([block ? block.call : nil]).join(',')).to match(/Intentional exception/)
stop_reactor
end
presence_client_one.get { raise 'Intentional exception' }
end
- it 'raise an exception if the channel is detached' do
+ it 'implicitly attaches the channel (#RTP11b)' do
+ expect(channel_client_one).to be_initialized
+ presence_client_one.get do |members|
+ expect(channel_client_one).to be_attached
+ stop_reactor
+ end
+ end
+
+ context 'when the channel is SUSPENDED' do
+ context 'with wait_for_sync: true' do
+ it 'results in an error with @code@ @91005@ and a @message@ stating that the presence state is out of sync (#RTP11d)' do
+ presence_client_one.enter do
+ channel_client_one.transition_state_machine! :suspended
+ presence_client_one.get(wait_for_sync: true).errback do |error|
+ expect(error.code).to eql(91005)
+ expect(error.message).to match(/presence state is out of sync/i)
+ stop_reactor
+ end
+ end
+ end
+ end
+
+ context 'with wait_for_sync: false' do
+ it 'returns the current PresenceMap and does not wait for the channel to change to the ATTACHED state (#RTP11d)' do
+ presence_client_one.enter do
+ channel_client_one.transition_state_machine! :suspended
+ presence_client_one.get(wait_for_sync: false) do |members|
+ expect(channel_client_one).to be_suspended
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+
+ it 'fails if the connection is DETACHED (#RTP11b)' do
channel_client_one.attach do
- channel_client_one.transition_state_machine :detaching
- channel_client_one.once(:detached) do
- expect { presence_client_one.get }.to raise_error Ably::Exceptions::InvalidStateChange, /Operation is not allowed when channel is in STATE.detached/i
- stop_reactor
+ channel_client_one.detach do
+ presence_client_one.get.tap do |deferrable|
+ deferrable.callback { raise 'Get should not succeed' }
+ deferrable.errback do |error|
+ expect(error).to be_a(Ably::Exceptions::InvalidState)
+ expect(error.message).to match(/Operation is not allowed when channel is in STATE.Detached/)
+ stop_reactor
+ end
+ end
end
end
end
- it 'raise an exception if the channel is failed' do
+ it 'fails if the connection is FAILED (#RTP11b)' do
channel_client_one.attach do
channel_client_one.transition_state_machine :failed
expect(channel_client_one.state).to eq(:failed)
- expect { presence_client_one.get }.to raise_error Ably::Exceptions::InvalidStateChange, /Operation is not allowed when channel is in STATE.failed/i
- stop_reactor
+ presence_client_one.get.tap do |deferrable|
+ deferrable.callback { raise 'Get should not succeed' }
+ deferrable.errback do |error|
+ expect(error).to be_a(Ably::Exceptions::InvalidState)
+ expect(error.message).to match(/Operation is not allowed when channel is in STATE.Failed/)
+ stop_reactor
+ end
+ end
end
end
context 'during a sync', em_timeout: 30 do
let(:pages) { 2 }
@@ -1238,11 +1602,11 @@
end
end
end
context 'when :wait_for_sync is true' do
- it 'fails if the connection fails' do
+ it 'fails if the connection becomes FAILED (#RTP11b)' do
when_all(*connect_members_deferrables) do
channel_client_two.attach do
client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
if protocol_message.action == :sync
sync_pages_received << protocol_message
@@ -1261,11 +1625,11 @@
end
end
end
end
- it 'fails if the channel is detached' do
+ it 'fails if the channel becomes detached (#RTP11b)' do
when_all(*connect_members_deferrables) do
channel_client_two.attach do
client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
if protocol_message.action == :sync
# prevent any more SYNC messages coming through
@@ -1285,14 +1649,14 @@
end
end
end
end
- # skip 'it fails if the connection changes to failed state'
-
- it 'returns the current members on the channel' do
- presence_client_one.enter do
+ it 'returns the current members on the channel (#RTP11a)' do
+ presence_client_one.enter
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
presence_client_one.get do |members|
expect(members.count).to eq(1)
expect(client_one.client_id).to_not be_nil
@@ -1302,11 +1666,11 @@
stop_reactor
end
end
end
- it 'filters by connection_id option if provided' do
+ it 'filters by connection_id option if provided (#RTP11c3)' do
presence_client_one.enter do
presence_client_two.enter
end
presence_client_one.subscribe(:enter) do |presence_message|
@@ -1324,11 +1688,11 @@
end
end
end
end
- it 'filters by client_id option if provided' do
+ it 'filters by client_id option if provided (#RTP11c2)' do
presence_client_one.enter do
presence_client_two.enter
end
presence_client_one.subscribe(:enter) do |presence_message|
@@ -1348,43 +1712,75 @@
end
end
end
end
- it 'does not wait for SYNC to complete if :wait_for_sync option is false' do
- presence_client_one.enter do
+ it 'does not wait for SYNC to complete if :wait_for_sync option is false (#RTP11c1)' do
+ presence_client_one.enter
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
presence_client_two.get(wait_for_sync: false) do |members|
expect(members.count).to eql(0)
stop_reactor
end
end
end
+ it 'returns the list of members and waits for SYNC to complete by default (#RTP11a)' do
+ presence_client_one.enter
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
+ presence_client_two.get do |members|
+ expect(members.count).to eql(1)
+ stop_reactor
+ end
+ end
+ end
+
context 'when a member enters and then leaves' do
it 'has no members' do
presence_client_one.enter do
- presence_client_one.leave do
- presence_client_one.get do |members|
- expect(members.count).to eq(0)
- stop_reactor
- end
+ presence_client_one.leave
+ end
+
+ presence_client_one.subscribe(:leave) do
+ presence_client_one.get do |members|
+ expect(members.count).to eq(0)
+ stop_reactor
end
end
end
end
+ context 'when a member enters and the presence map is updated' do
+ it 'adds the member as being :present (#RTP2d)' do
+ presence_client_one.enter
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
+ presence_client_one.get do |members|
+ expect(members.count).to eq(1)
+ expect(members.first.action).to eq(:present)
+ stop_reactor
+ end
+ end
+ end
+ end
+
context 'with lots of members on different clients' do
let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) }
let(:client_two) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: wildcard_token)) }
let(:members_per_client) { 10 }
let(:clients_entered) { Hash.new { |hash, key| hash[key] = 0 } }
let(:total_members) { members_per_client * 2 }
it 'returns a complete list of members on all clients' do
- members_per_client.times do |index|
- presence_client_one.enter_client("client_1:#{index}")
- presence_client_two.enter_client("client_2:#{index}")
+ members_per_client.times do |indx|
+ presence_client_one.enter_client("client_1:#{indx}")
+ presence_client_two.enter_client("client_2:#{indx}")
end
presence_client_one.subscribe(:enter) do
clients_entered[:client_one] += 1
end
@@ -1467,11 +1863,13 @@
context 'with a callback that raises an exception' do
let(:exception) { StandardError.new("Intentional error") }
it 'logs the error and continues' do
emitted_exception = false
- expect(client_one.logger).to receive(:error).with(/#{exception.message}/)
+ expect(client_one.logger).to receive(:error) do |*args, &block|
+ expect(args.concat([block ? block.call : nil]).join(',')).to match(/#{exception.message}/)
+ end
presence_client_one.subscribe do |presence_message|
emitted_exception = true
raise exception
end
presence_client_one.enter do
@@ -1522,11 +1920,14 @@
end
end
context 'REST #get' do
it 'returns current members' do
- presence_client_one.enter(data_payload) do
+ presence_client_one.enter data_payload
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
members_page = channel_rest_client_one.presence.get
this_member = members_page.items.first
expect(this_member).to be_a(Ably::Models::PresenceMessage)
expect(this_member.client_id).to eql(client_one.client_id)
@@ -1536,11 +1937,14 @@
end
end
it 'returns no members once left' do
presence_client_one.enter(data_payload) do
- presence_client_one.leave do
+ presence_client_one.leave
+ presence_client_one.subscribe(:leave) do
+ presence_client_one.unsubscribe :leave
+
members_page = channel_rest_client_one.presence.get
expect(members_page.items.count).to eql(0)
stop_reactor
end
end
@@ -1652,11 +2056,12 @@
end
end
context '#get' do
it 'returns a list of members with decrypted data' do
- encrypted_channel.presence.enter(data) do
+ encrypted_channel.presence.enter(data)
+ encrypted_channel.presence.subscribe(:enter) do
encrypted_channel.presence.get do |members|
member = members.first
expect(member.encoding).to be_nil
expect(member.data).to eql(data)
stop_reactor
@@ -1665,11 +2070,12 @@
end
end
context 'REST #get' do
it 'returns a list of members with decrypted data' do
- encrypted_channel.presence.enter(data) do
+ encrypted_channel.presence.enter(data)
+ encrypted_channel.presence.subscribe(:enter) do
member = channel_rest_client_one.presence.get.items.first
expect(member.encoding).to be_nil
expect(member.data).to eql(data)
stop_reactor
end
@@ -1694,13 +2100,12 @@
end
end
it 'emits an error when cipher does not match and presence data cannot be decoded' do
incompatible_encrypted_channel.attach do
- incompatible_encrypted_channel.on(:error) do |error|
- expect(error).to be_a(Ably::Exceptions::CipherError)
- expect(error.message).to match(/Cipher algorithm AES-128-CBC does not match/)
+ expect(client_two.logger).to receive(:error) do |*args, &block|
+ expect(args.concat([block ? block.call : nil]).join(',')).to match(/Cipher algorithm AES-128-CBC does not match/)
stop_reactor
end
encrypted_channel.attach do
encrypted_channel.presence.enter data
@@ -1734,17 +2139,17 @@
end
end
end
context 'connection failure mid-way through a large member sync' do
- let(:members_count) { 250 }
+ let(:members_count) { 201 }
let(:sync_pages_received) { [] }
- let(:client_options) { default_options.merge(log_level: :error) }
+ let(:client_options) { default_options.merge(log_level: :fatal) }
- it 'resumes the SYNC operation', em_timeout: 15 do
- when_all(*members_count.times.map do |index|
- presence_anonymous_client.enter_client("client:#{index}")
+ it 'resumes the SYNC operation (#RTP3)', em_timeout: 15 do
+ when_all(*members_count.times.map do |indx|
+ presence_anonymous_client.enter_client("client:#{indx}")
end) do
channel_client_two.attach do
client_two.connection.transport.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
if protocol_message.action == :sync
sync_pages_received << protocol_message
@@ -1755,9 +2160,659 @@
presence_client_two.get(wait_for_sync: true) do |members|
expect(members.count).to eql(members_count)
expect(members.map(&:member_key).uniq.count).to eql(members_count)
stop_reactor
+ end
+ end
+ end
+ end
+
+ context 'server-initiated sync' do
+ context 'with multiple SYNC pages' do
+ let(:present_action) { 1 }
+ let(:leave_action) { 3 }
+ let(:presence_sync_1) do
+ [
+ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: present_action },
+ { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: present_action }
+ ]
+ end
+ let(:presence_sync_2) do
+ [
+ { client_id: 'a', connection_id: 'one', id: 'one:1:0', action: leave_action }
+ ]
+ end
+
+ it 'is initiated with a SYNC message and completed with a later SYNC message with no cursor value part of the channelSerial (#RTP18a, #RTP18b) ', em_timeout: 15 do
+ presence_anonymous_client.get do |members|
+ expect(members.length).to eql(0)
+ expect(presence_anonymous_client).to be_sync_complete
+
+ presence_anonymous_client.subscribe(:present) do
+ expect(presence_anonymous_client).to_not be_sync_complete
+ presence_anonymous_client.get do |members|
+ expect(presence_anonymous_client).to be_sync_complete
+ expect(members.length).to eql(1)
+ expect(members.first.client_id).to eql('b')
+ stop_reactor
+ end
+ end
+
+ ## Fabricate server-initiated SYNC in two parts
+ action = Ably::Models::ProtocolMessage::ACTION.Sync
+ sync_message = Ably::Models::ProtocolMessage.new(
+ action: action,
+ connection_serial: 10,
+ channel_serial: 'sequenceid:cursor',
+ channel: channel_name,
+ presence: presence_sync_1,
+ timestamp: Time.now.to_i * 1000
+ )
+ anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message
+
+ sync_message = Ably::Models::ProtocolMessage.new(
+ action: action,
+ connection_serial: 11,
+ channel_serial: 'sequenceid:', # indicates SYNC is complete
+ channel: channel_name,
+ presence: presence_sync_2,
+ timestamp: Time.now.to_i * 1000
+ )
+ anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message
+ end
+ end
+ end
+
+ context 'with a single SYNC page' do
+ let(:present_action) { 1 }
+ let(:leave_action) { 3 }
+ let(:presence_sync) do
+ [
+ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: present_action },
+ { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: present_action },
+ { client_id: 'a', connection_id: 'one', id: 'one:1:0', action: leave_action }
+ ]
+ end
+
+ it 'is initiated and completed with a single SYNC message (and no channelSerial) (#RTP18a, #RTP18c) ', em_timeout: 15 do
+ presence_anonymous_client.get do |members|
+ expect(members.length).to eql(0)
+ expect(presence_anonymous_client).to be_sync_complete
+
+ presence_anonymous_client.subscribe(:present) do
+ expect(presence_anonymous_client).to_not be_sync_complete
+ presence_anonymous_client.get do |members|
+ expect(presence_anonymous_client).to be_sync_complete
+ expect(members.length).to eql(1)
+ expect(members.first.client_id).to eql('b')
+ stop_reactor
+ end
+ end
+
+ ## Fabricate server-initiated SYNC in two parts
+ action = Ably::Models::ProtocolMessage::ACTION.Sync
+ sync_message = Ably::Models::ProtocolMessage.new(
+ action: action,
+ connection_serial: 10,
+ channel: channel_name,
+ presence: presence_sync,
+ timestamp: Time.now.to_i * 1000
+ )
+ anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message
+ end
+ end
+ end
+
+ context 'when members exist in the PresenceMap before a SYNC completes' do
+ let(:enter_action) { Ably::Models::PresenceMessage::ACTION.Enter.to_i }
+ let(:present_action) { Ably::Models::PresenceMessage::ACTION.Present.to_i }
+ let(:presence_sync_protocol_message) do
+ [
+ { client_id: 'a', connection_id: 'one', id: 'one:0:0', action: present_action },
+ { client_id: 'b', connection_id: 'one', id: 'one:0:1', action: present_action }
+ ]
+ end
+ let(:presence_enter_message) do
+ Ably::Models::PresenceMessage.new(
+ 'id' => "#{random_str}:#{random_str}:0",
+ 'clientId' => random_str,
+ 'connectionId' => random_str,
+ 'timestamp' => as_since_epoch(Time.now),
+ 'action' => enter_action
+ )
+ end
+
+ it 'removes the members that are no longer present (#RTP19)', em_timeout: 15 do
+ presence_anonymous_client.get do |members|
+ expect(members.length).to eql(0)
+
+ # Now inject a fake member into the PresenceMap by faking the receive of a Presence message from Ably into the Presence object
+ presence_anonymous_client.__incoming_msgbus__.publish :presence, presence_enter_message
+
+ EventMachine.next_tick do
+ presence_anonymous_client.get do |members|
+ expect(members.length).to eql(1)
+ expect(members.first.client_id).to eql(presence_enter_message.client_id)
+
+ presence_events = []
+ presence_anonymous_client.subscribe do |presence_message|
+ presence_events << [presence_message.client_id, presence_message.action.to_sym]
+ if presence_message.action == :leave
+ expect(presence_message.id).to be_nil
+ expect(presence_message.timestamp.to_f * 1000).to be_within(20).of(Time.now.to_f * 1000)
+ end
+ end
+
+ ## Fabricate server-initiated SYNC in two parts
+ action = Ably::Models::ProtocolMessage::ACTION.Sync
+ sync_message = Ably::Models::ProtocolMessage.new(
+ action: action,
+ connection_serial: 10,
+ channel: channel_name,
+ presence: presence_sync_protocol_message,
+ timestamp: Time.now.to_i * 1000
+ )
+ anonymous_client.connection.__incoming_protocol_msgbus__.publish :protocol_message, sync_message
+
+ EventMachine.next_tick do
+ presence_anonymous_client.get do |members|
+ expect(members.length).to eql(2)
+ expect(members.find { |member| member.client_id == presence_enter_message.client_id}).to be_nil
+ expect(presence_events).to contain_exactly(
+ ['a', :present],
+ ['b', :present],
+ [presence_enter_message.client_id, :leave]
+ )
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+
+ context 'when the client does not have presence subscribe privileges but is present on the channel' do
+ let(:present_only_capability) do
+ { channel_name => ["presence"] }
+ end
+ let(:present_only_callback) { Proc.new { Ably::Rest::Client.new(client_options).auth.request_token(client_id: '*', capability: present_only_capability) } }
+ let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: present_only_callback)) }
+
+ it 'receives presence updates for all presence events generated by the current connection and the presence map is kept up to date (#RTP17a)' do
+ skip 'This functionality is not yet in sandbox, see https://github.com/ably/realtime/issues/656'
+
+ enter_client_ids = []
+ presence_client_one.subscribe(:enter) do |presence_message|
+ enter_client_ids << presence_message.client_id
+ end
+
+ leave_client_ids = []
+ presence_client_one.subscribe(:leave) do
+ leave_client_ids << presence_message.client_id
+ end
+
+ presence_client_one.enter_client 'bob' do
+ presence_client_one.enter_client 'sarah'
+ end
+
+ entered_count = 0
+ presence_client_one.subscribe(:enter) do
+ entered_count += 1
+ next unless entered_count == 2
+
+ presence_client_one.unsubscribe :enter
+ presence_client_one.get do |members|
+ EventMachine.add_timer(1) do
+ expect(members.map(&:client_id)).to contain_exactly('bob', 'sarah')
+ expect(enter_client_ids).to contain_exactly('bob', 'sarah')
+
+ presence_client_one.leave_client 'bob' do
+ presence_client_one.leave_client 'sarah'
+ end
+
+ leave_count = 0
+ presence_client_one.subscribe(:leave) do
+ leave_count += 1
+ next unless leave_count == 2
+
+ presence_client_one.get do |members|
+ expect(members.length).to eql(0)
+ expect(leave_client_ids).to contain_exactly('bob', 'sarah')
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+
+ context "local PresenceMap for presence members entered by this client" do
+ it "maintains a copy of the member map for any member that shares this connection's connection ID (#RTP17)" do
+ presence_client_one.enter do
+ presence_client_two.enter
+ end
+
+ entered_count = 0
+ presence_client_one.subscribe(:enter) do
+ entered_count += 1
+ next unless entered_count == 2
+ channel_anonymous_client.attach do
+ channel_anonymous_client.presence.get do |members|
+ expect(channel_anonymous_client.presence.members.local_members).to be_empty
+ expect(presence_client_one.members.local_members.length).to eql(1)
+ expect(presence_client_one.members.local_members.values.first.connection_id).to eql(client_one.connection.id)
+ expect(presence_client_two.members.local_members.values.first.connection_id).to eql(client_two.connection.id)
+ presence_client_two.leave
+ presence_client_two.subscribe(:leave) do
+ expect(presence_client_two.members.local_members).to be_empty
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+
+ context 'when a channel becomes attached again' do
+ let(:attached_action) { Ably::Models::ProtocolMessage::ACTION.Attached.to_i }
+ let(:sync_action) { Ably::Models::ProtocolMessage::ACTION.Sync.to_i }
+ let(:presence_action) { Ably::Models::ProtocolMessage::ACTION.Presence.to_i }
+ let(:present_action) { Ably::Models::PresenceMessage::ACTION.Present.to_i }
+ let(:resume_flag) { 4 }
+ let(:presence_flag) { 1 }
+
+ def fabricate_incoming_protocol_message(protocol_message)
+ client_one.connection.__incoming_protocol_msgbus__.publish :protocol_message, protocol_message
+ end
+
+ # Prevents any messages from the WebSocket transport being sent / received
+ # Connection protocol message subscriptions are still active, but nothing reaches or comes from the WebSocket transport
+ def cripple_websocket_transport
+ client_one.connection.transport.__incoming_protocol_msgbus__.unsubscribe
+ client_one.connection.transport.__outgoing_protocol_msgbus__.unsubscribe
+ end
+
+ context 'and the resume flag is true' do
+ context 'and the presence flag is false' do
+ it 'does not send any presence events as the PresenceMap is in sync (#RTP5c1)' do
+ presence_client_one.enter
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
+ client_one.connection.transport.__outgoing_protocol_msgbus__.subscribe do |message|
+ raise "No presence state updates to Ably are expected. Message sent: #{message.to_json}" if client_one.connection.connected?
+ end
+
+ cripple_websocket_transport
+
+ fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new(
+ action: attached_action,
+ channel: channel_name,
+ flags: resume_flag
+ )
+
+ EventMachine.add_timer(1) do
+ presence_client_one.get do |members|
+ expect(members.length).to eql(1)
+ expect(presence_client_one.members.local_members.length).to eql(1)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+
+ context 'and the presence flag is true' do
+ context 'and following the SYNC all local MemberMap members are present in the PresenceMap' do
+ it 'does nothing as MemberMap is in sync (#RTP5c2)' do
+ presence_client_one.enter
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
+ expect(presence_client_one.members.length).to eql(1)
+ expect(presence_client_one.members.local_members.length).to eql(1)
+
+ presence_client_one.members.once(:in_sync) do
+ presence_client_one.get do |members|
+ expect(members.length).to eql(1)
+ expect(presence_client_one.members.local_members.length).to eql(1)
+ stop_reactor
+ end
+ end
+
+ client_one.connection.transport.__outgoing_protocol_msgbus__.subscribe do |message|
+ raise "No presence state updates to Ably are expected. Message sent: #{message.to_json}" if client_one.connection.connected?
+ end
+
+ cripple_websocket_transport
+
+ fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new(
+ action: attached_action,
+ channel: channel_name,
+ flags: resume_flag + presence_flag
+ )
+
+ fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new(
+ action: sync_action,
+ channel: channel_name,
+ presence: presence_client_one.members.map(&:shallow_clone).map(&:as_json),
+ channelSerial: nil # no further SYNC messages expected
+ )
+ end
+ end
+ end
+
+ context 'and following the SYNC a local MemberMap member is not present in the PresenceMap' do
+ it 're-enters the missing members automatically (#RTP5c2)' do
+ sync_check_completed = false
+
+ presence_client_one.enter
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
+ expect(presence_client_one.members.length).to eql(1)
+ expect(presence_client_one.members.local_members.length).to eql(1)
+
+ client_one.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |message|
+ next if message.action == :close # ignore finalization of connection
+
+ expect(message.action).to eq(:presence)
+ presence_message = message.presence.first
+ expect(presence_message.action).to eq(:enter)
+ expect(presence_message.client_id).to eq(client_one.auth.client_id)
+
+ presence_client_one.subscribe(:enter) do |message|
+ expect(message.connection_id).to eql(client_one.connection.id)
+ expect(message.client_id).to eq(client_one.auth.client_id)
+
+ EventMachine.next_tick do
+ expect(presence_client_one.members.length).to eql(2)
+ expect(presence_client_one.members.local_members.length).to eql(1)
+ expect(sync_check_completed).to be_truthy
+ stop_reactor
+ end
+ end
+
+ # Fabricate Ably sending back the Enter PresenceMessage to the client a short while after
+ # ensuring the PresenceMap for a short period does not have this member as to be expected in reality
+ EventMachine.add_timer(0.2) do
+ connection_id = random_str
+ fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new(
+ action: presence_action,
+ channel: channel_name,
+ connectionId: client_one.connection.id,
+ connectionSerial: 50,
+ timestamp: as_since_epoch(Time.now),
+ presence: [presence_message.shallow_clone(id: "#{client_one.connection.id}:0:0", timestamp: as_since_epoch(Time.now)).as_json]
+ )
+ end
+ end
+
+ presence_client_one.members.once(:in_sync) do
+ # For a brief period, the client will have re-entered the missing members from the local_members
+ # but the enter from Ably will have not been received, so at this point the local_members will be empty
+ presence_client_one.get do |members|
+ expect(members.length).to eql(1)
+ expect(members.first.connection_id).to_not eql(client_one.connection.id)
+ expect(presence_client_one.members.local_members.length).to eql(0)
+ sync_check_completed = true
+ end
+ end
+
+ cripple_websocket_transport
+
+ fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new(
+ action: attached_action,
+ channel: channel_name,
+ flags: resume_flag + presence_flag
+ )
+
+ # Complete the SYNC but without the member who was entered by this client
+ connection_id = random_str
+ fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new(
+ action: sync_action,
+ channel: channel_name,
+ timestamp: as_since_epoch(Time.now),
+ presence: [{ id: "#{connection_id}:0:0", action: present_action, connection_id: connection_id, client_id: random_str }],
+ chanenlSerial: nil # no further SYNC messages expected
+ )
+ end
+ end
+ end
+ end
+ end
+
+ context 'and the resume flag is false' do
+ context 'and the presence flag is false' do
+ let(:member_data) { random_str }
+
+ it 'immediately resends all local presence members (#RTP5c2, #RTP19a)' do
+ in_sync_confirmed_no_local_members = false
+ local_member_leave_event_fired = false
+
+ presence_client_one.enter(member_data)
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
+ presence_client_one.subscribe(:leave) do |message|
+ # The local member will leave the PresenceMap due to the ATTACHED without Presence
+ local_member_leave_event_fired = true
+ end
+
+ # Local members re-entered automatically appear as updates due to the
+ # fabricated ATTACHED message sent and the members already being present
+ presence_client_one.subscribe(:update) do |message|
+ expect(local_member_leave_event_fired).to be_truthy
+ expect(message.data).to eq(member_data)
+ expect(message.client_id).to eq(client_one.auth.client_id)
+ EventMachine.next_tick do
+ expect(presence_client_one.members.length).to eql(1)
+ expect(presence_client_one.members.local_members.length).to eql(1)
+ expect(in_sync_confirmed_no_local_members).to be_truthy
+ stop_reactor
+ end
+ end
+
+ presence_client_one.members.once(:in_sync) do
+ # Immediately after SYNC (no sync actually occurred, but this event fires immediately after a channel SYNCs or is not expecting to SYNC)
+ expect(presence_client_one.members.length).to eql(0)
+ expect(presence_client_one.members.local_members.length).to eql(0)
+ in_sync_confirmed_no_local_members = true
+ end
+
+ # ATTACHED ProtocolMessage with no presence flag will clear the presence set immediately, #RTP19a
+ fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new(
+ action: attached_action,
+ channel: channel_name,
+ flags: 0 # no resume or presence flag
+ )
+ end
+ end
+ end
+ end
+
+ context 'when re-entering a client automatically, if the re-enter fails for any reason' do
+ let(:client_one_options) do
+ client_options.merge(client_id: client_one_id, log_level: :error)
+ end
+ let(:client_one) { auto_close Ably::Realtime::Client.new(client_one_options) }
+
+ it 'should emit an ErrorInfo with error code 91004 (#RTP5c3)' do
+ presence_client_one.enter
+
+ # Wait for client to be entered
+ presence_client_one.subscribe(:enter) do
+ # Local member should not be re-entered as the request to re-enter will timeout
+ presence_client_one.subscribe(:update) do |message|
+ raise "Unexpected update, this should not happen as the re-enter fails"
+ end
+
+ channel_client_one.on(:update) do |channel_state_change|
+ next if channel_state_change.reason.nil? # first update is generated by the fabricated ATTACHED
+
+ expect(channel_state_change.reason.code).to eql(91004)
+ expect(channel_state_change.reason.message).to match(/#{client_one_id}/)
+ expect(channel_state_change.reason.message).to match(/Fabricated/) # fabricated message
+ expect(channel_state_change.reason.message).to match(/2345/) # fabricated error code
+ stop_reactor
+ end
+
+ cripple_websocket_transport
+
+ client_one.connection.__outgoing_protocol_msgbus__.subscribe do |protocol_message|
+ if protocol_message.action == :presence
+ # Fabricate a NACK for the re-enter message
+ EventMachine.add_timer(0.1) do
+ fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new(
+ action: Ably::Models::ProtocolMessage::ACTION.Nack.to_i ,
+ channel: channel_name,
+ count: 1,
+ msg_serial: protocol_message.message_serial,
+ error: {
+ message: 'Fabricated',
+ code: 2345
+ }
+ )
+ end
+ end
+ end
+
+ # ATTACHED ProtocolMessage with no presence flag will clear the presence set immediately, #RTP19a
+ fabricate_incoming_protocol_message Ably::Models::ProtocolMessage.new(
+ action: attached_action,
+ channel: channel_name,
+ flags: 0 # no resume or presence flag
+ )
+ end
+ end
+ end
+ end
+ end
+
+ context 'channel state side effects' do
+ context 'channel transitions to the FAILED state' do
+ let(:anonymous_client) { auto_close Ably::Realtime::Client.new(client_options.merge(log_level: :fatal)) }
+ let(:client_one) { auto_close Ably::Realtime::Client.new(client_options.merge(client_id: client_one_id, log_level: :fatal)) }
+
+ it 'clears the PresenceMap and local member map copy and does not emit any presence events (#RTP5a)' do
+ presence_client_one.enter
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
+ channel_anonymous_client.attach do
+ presence_anonymous_client.get do |members|
+ expect(members.count).to eq(1)
+
+ presence_anonymous_client.subscribe { raise 'No presence events should be emitted' }
+ channel_anonymous_client.transition_state_machine! :failed, reason: RuntimeError.new
+ expect(presence_anonymous_client.members.length).to eq(0)
+ expect(channel_anonymous_client).to be_failed
+ presence_anonymous_client.unsubscribe # prevent events being sent to the channel from Ably as it is unaware it's FAILED
+
+ expect(presence_client_one.members.local_members.count).to eq(1)
+ channel_client_one.transition_state_machine! :failed
+ expect(channel_client_one).to be_failed
+ expect(presence_client_one.members.local_members.count).to eq(0)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+
+ context 'channel transitions to the DETACHED state' do
+ it 'clears the PresenceMap and local member map copy and does not emit any presence events (#RTP5a)' do
+ presence_client_one.enter
+ presence_client_one.subscribe(:enter) do
+ presence_client_one.unsubscribe :enter
+
+ channel_anonymous_client.attach do
+ presence_anonymous_client.get do |members|
+ expect(members.count).to eq(1)
+
+ presence_anonymous_client.subscribe { raise 'No presence events should be emitted' }
+ channel_anonymous_client.detach do
+ expect(presence_anonymous_client.members.length).to eq(0)
+ expect(channel_anonymous_client).to be_detached
+
+ expect(presence_client_one.members.local_members.count).to eq(1)
+ channel_client_one.detach do
+ expect(presence_client_one.members.local_members.count).to eq(0)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+
+ context 'channel transitions to the SUSPENDED state' do
+ let(:auth_callback) do
+ Proc.new do
+ # Pause to allow presence updates to occur whilst disconnected
+ sleep 1
+ Ably::Rest::Client.new(client_options).auth.request_token
+ end
+ end
+ let(:anonymous_client) { auto_close Ably::Realtime::Client.new(client_options.merge(auth_callback: auth_callback)) }
+
+ it 'maintains the PresenceMap and only publishes presence event changes since the last attached state (#RTP5f)' do
+ presence_client_one.enter do
+ presence_client_two.enter
+ end
+
+ entered_count = 0
+ presence_client_one.subscribe(:enter) do
+ entered_count += 1
+ next unless entered_count == 2
+
+ presence_client_one.unsubscribe :enter
+ channel_anonymous_client.attach do
+ presence_anonymous_client.get do |members|
+ expect(members.count).to eq(2)
+
+ received_events = []
+ presence_anonymous_client.subscribe do |presence_message|
+ expect(presence_message.action).to eq(:leave)
+ expect(presence_message.client_id).to eql(client_one_id)
+ received_events << [:leave, presence_message.client_id]
+ end
+
+ # Kill the connection triggering an automatic reconnect and reattach of the channel that is about to put into the suspended state
+ anonymous_client.connection.transport.close_connection_after_writing
+
+ # Prevent the same connection resuming, we want a new connection and the channel SYNC to be sent
+ anonymous_client.connection.reset_resume_info
+
+ anonymous_client.connection.once(:disconnected) do
+ # Move to the SUSPENDED state and check presence map intact
+ channel_anonymous_client.transition_state_machine! :suspended
+
+ # Change the presence map state on that channel by getting one member to leave whilst the connection for anonymous client is diconnected
+ presence_client_one.leave
+
+ # Whilst SUSPENDED and DISCONNECTED, a get of the PresenceMap should still reveal two members
+ presence_anonymous_client.get(wait_for_sync: false) do |members|
+ expect(members.count).to eq(2)
+
+ channel_anonymous_client.once(:attached) do
+ presence_anonymous_client.get do |members|
+ expect(members.count).to eq(1)
+ EventMachine.add_timer(0.5) do
+ expect(received_events).to contain_exactly([:leave, client_one_id])
+ presence_anonymous_client.unsubscribe
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ end
+ end
end
end
end
end
end