spec/acceptance/realtime/connection_spec.rb in ably-1.1.0 vs spec/acceptance/realtime/connection_spec.rb in ably-1.1.1

- old
+ new

@@ -262,11 +262,11 @@ total_expected = publish_count * iteration publish_count.times.each { |index| publishing_channel.publish('event', (total_expected - publish_count + index).to_s) } channel.subscribe('event') do |message| messages_received << message.data.to_i if messages_received.count == total_expected - expect(messages_received).to match(total_expected.times) + expect(messages_received).to match(total_expected.times.to_a) expect(auth_requests.count).to eql(iteration + 1) EventMachine.add_timer(1) do channel.unsubscribe 'event' yield end @@ -647,20 +647,51 @@ stop_reactor end end it 'is set to 1 when the second message is received' do - channel.publish('event', 'data') do - channel.publish('event', 'data') + channel.attach do + messages = [] + channel.subscribe do |message| + messages << message + if messages.length == 2 + expect(connection.serial).to eql(1) + stop_reactor + end + end + + channel.publish('event', 'data') do + channel.publish('event', 'data') + end end + end + end - messages = [] - channel.subscribe do |message| - messages << message - if messages.length == 2 - expect(connection.serial).to eql(1) - stop_reactor + describe '#msgSerial' do + context 'when messages are queued for publish before a connection is established' do + let(:batches) { 6 } + let(:messages_per_batch) { 10 } + + let(:publishing_client) { auto_close Ably::Realtime::Client.new(default_options) } + let(:channel_name) { random_str } + let(:publishing_channel) { publishing_client.channels.get(channel_name) } + let(:receiving_channel) { client.channels.get(channel_name) } + + it 'the msgSerial is always incrementing (and not reset when the new connection is established) ensuring messages are never de-duped by the realtime service' do + messages = [] + + receiving_channel.attach do + receiving_channel.subscribe('event') do |message| + messages << message + stop_reactor if messages.count == batches * messages_per_batch + end + + batches.times do |batch| + EventMachine.add_timer(batch.to_f / batches.to_f) do + messages_per_batch.times { |index| publishing_channel.publish('event') } + end + end end end end end @@ -1003,14 +1034,13 @@ context 'with websocket heartbeats disabled (undocumented)' do let(:client_options) { default_options.merge(websocket_heartbeats_disabled: true) } it 'does not provide the heartbeats argument in the websocket connection params (#RTN23b)' do - skip 'Native heartbeats not yet supported in the WS driver https://github.com/ably/ably-ruby/issues/116' expect(EventMachine).to receive(:connect) do |host, port, transport, object, url| uri = URI.parse(url) - expect(CGI::parse(uri.query)['heartbeats'][0]).to be_nil + expect(CGI::parse(uri.query)['heartbeats'][0]).to eql('true') stop_reactor end client end @@ -1124,11 +1154,11 @@ channel.subscribe do channel.unsubscribe expected_serial += 1 # attach message received expect(connection.serial).to eql(expected_serial) - expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}") + expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}:#{connection.send(:client_msg_serial)}") stop_reactor end end end end @@ -1235,10 +1265,82 @@ end end end end end + + context 'when messages have been published' do + describe 'the new connection' do + it 'uses the correct msgSerial from the old connection' do + msg_serial, recovery_key, connection_id = nil, nil, nil + + channel.attach do + expect(connection.send(:client_msg_serial)).to eql(-1) # no messages published yet + connection_id = client.connection.id + connection.transport.__incoming_protocol_msgbus__ + channel.publish('event', 'message') do + msg_serial = connection.send(:client_msg_serial) + expect(msg_serial).to eql(0) + recovery_key = client.connection.recovery_key + connection.transition_state_machine! :failed + end + end + + connection.on(:failed) do + recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: recovery_key)) + recover_client_channel = recover_client.channel(channel_name) + recover_client_channel.attach do + expect(recover_client.connection.id).to eql(connection_id) + expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial) + stop_reactor + end + end + end + end + end + + context 'when messages are published before the new connection is recovered' do + describe 'the new connection' do + it 'uses the correct msgSerial from the old connection for the queued messages' do + msg_serial, recovery_key, connection_id = nil, nil, nil + + channel.attach do + expect(connection.send(:client_msg_serial)).to eql(-1) # no messages published yet + connection_id = client.connection.id + connection.transport.__incoming_protocol_msgbus__ + channel.publish('event', 'message-1') do + msg_serial = connection.send(:client_msg_serial) + expect(msg_serial).to eql(0) + recovery_key = client.connection.recovery_key + connection.transition_state_machine! :failed + end + end + + connection.on(:failed) do + recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: recovery_key)) + recover_client_channel = recover_client.channel(channel_name) + expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial) + + recover_client.connection.once(:connecting) do + recover_client_channel.publish('event', 'message-2') + expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial + 1) + end + + recover_client_channel.attach do + expect(recover_client.connection.id).to eql(connection_id) + + recover_client_channel.subscribe do |message| + raise "Unexpected message #{message}" if message.data != 'message-2' + EventMachine.add_timer(2) do + stop_reactor + end + end + end + end + end + end + end end context 'with :recover option' do context 'with invalid syntax' do let(:invaid_client_options) { default_options.merge(recover: 'invalid') } @@ -1248,11 +1350,11 @@ stop_reactor end end context 'with invalid formatted value sent to server' do - let(:client_options) { default_options.merge(recover: 'not-a-valid-connection-key:1', log_level: :none) } + let(:client_options) { default_options.merge(recover: 'not-a-valid-connection-key:1:0', log_level: :none) } it 'sets the #error_reason and moves the connection to FAILED' do connection.once(:failed) do |state_change| expect(connection.state).to eq(:failed) expect(state_change.reason.message).to match(/Invalid connectionKey/i) @@ -1263,10 +1365,10 @@ end end end context 'with expired (missing) value sent to server' do - let(:client_options) { default_options.merge(recover: 'wVIsgTHAB1UvXh7z-1991d8586:0', log_level: :fatal) } + let(:client_options) { default_options.merge(recover: 'wVIsgTHAB1UvXh7z-1991d8586:0:0', log_level: :fatal) } it 'connects but sets the error reason and includes the reason in the state change' do connection.once(:connected) do |state_change| expect(connection.state).to eq(:connected) expect(state_change.reason.message).to match(/Unable to recover connection/i)