spec/acceptance/realtime/connection_spec.rb in ably-1.1.0 vs spec/acceptance/realtime/connection_spec.rb in ably-1.1.1
- old
+ new
@@ -262,11 +262,11 @@
total_expected = publish_count * iteration
publish_count.times.each { |index| publishing_channel.publish('event', (total_expected - publish_count + index).to_s) }
channel.subscribe('event') do |message|
messages_received << message.data.to_i
if messages_received.count == total_expected
- expect(messages_received).to match(total_expected.times)
+ expect(messages_received).to match(total_expected.times.to_a)
expect(auth_requests.count).to eql(iteration + 1)
EventMachine.add_timer(1) do
channel.unsubscribe 'event'
yield
end
@@ -647,20 +647,51 @@
stop_reactor
end
end
it 'is set to 1 when the second message is received' do
- channel.publish('event', 'data') do
- channel.publish('event', 'data')
+ channel.attach do
+ messages = []
+ channel.subscribe do |message|
+ messages << message
+ if messages.length == 2
+ expect(connection.serial).to eql(1)
+ stop_reactor
+ end
+ end
+
+ channel.publish('event', 'data') do
+ channel.publish('event', 'data')
+ end
end
+ end
+ end
- messages = []
- channel.subscribe do |message|
- messages << message
- if messages.length == 2
- expect(connection.serial).to eql(1)
- stop_reactor
+ describe '#msgSerial' do
+ context 'when messages are queued for publish before a connection is established' do
+ let(:batches) { 6 }
+ let(:messages_per_batch) { 10 }
+
+ let(:publishing_client) { auto_close Ably::Realtime::Client.new(default_options) }
+ let(:channel_name) { random_str }
+ let(:publishing_channel) { publishing_client.channels.get(channel_name) }
+ let(:receiving_channel) { client.channels.get(channel_name) }
+
+ it 'the msgSerial is always incrementing (and not reset when the new connection is established) ensuring messages are never de-duped by the realtime service' do
+ messages = []
+
+ receiving_channel.attach do
+ receiving_channel.subscribe('event') do |message|
+ messages << message
+ stop_reactor if messages.count == batches * messages_per_batch
+ end
+
+ batches.times do |batch|
+ EventMachine.add_timer(batch.to_f / batches.to_f) do
+ messages_per_batch.times { |index| publishing_channel.publish('event') }
+ end
+ end
end
end
end
end
@@ -1003,14 +1034,13 @@
context 'with websocket heartbeats disabled (undocumented)' do
let(:client_options) { default_options.merge(websocket_heartbeats_disabled: true) }
it 'does not provide the heartbeats argument in the websocket connection params (#RTN23b)' do
- skip 'Native heartbeats not yet supported in the WS driver https://github.com/ably/ably-ruby/issues/116'
expect(EventMachine).to receive(:connect) do |host, port, transport, object, url|
uri = URI.parse(url)
- expect(CGI::parse(uri.query)['heartbeats'][0]).to be_nil
+ expect(CGI::parse(uri.query)['heartbeats'][0]).to eql('true')
stop_reactor
end
client
end
@@ -1124,11 +1154,11 @@
channel.subscribe do
channel.unsubscribe
expected_serial += 1 # attach message received
expect(connection.serial).to eql(expected_serial)
- expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}")
+ expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}:#{connection.send(:client_msg_serial)}")
stop_reactor
end
end
end
end
@@ -1235,10 +1265,82 @@
end
end
end
end
end
+
+ context 'when messages have been published' do
+ describe 'the new connection' do
+ it 'uses the correct msgSerial from the old connection' do
+ msg_serial, recovery_key, connection_id = nil, nil, nil
+
+ channel.attach do
+ expect(connection.send(:client_msg_serial)).to eql(-1) # no messages published yet
+ connection_id = client.connection.id
+ connection.transport.__incoming_protocol_msgbus__
+ channel.publish('event', 'message') do
+ msg_serial = connection.send(:client_msg_serial)
+ expect(msg_serial).to eql(0)
+ recovery_key = client.connection.recovery_key
+ connection.transition_state_machine! :failed
+ end
+ end
+
+ connection.on(:failed) do
+ recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: recovery_key))
+ recover_client_channel = recover_client.channel(channel_name)
+ recover_client_channel.attach do
+ expect(recover_client.connection.id).to eql(connection_id)
+ expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+
+ context 'when messages are published before the new connection is recovered' do
+ describe 'the new connection' do
+ it 'uses the correct msgSerial from the old connection for the queued messages' do
+ msg_serial, recovery_key, connection_id = nil, nil, nil
+
+ channel.attach do
+ expect(connection.send(:client_msg_serial)).to eql(-1) # no messages published yet
+ connection_id = client.connection.id
+ connection.transport.__incoming_protocol_msgbus__
+ channel.publish('event', 'message-1') do
+ msg_serial = connection.send(:client_msg_serial)
+ expect(msg_serial).to eql(0)
+ recovery_key = client.connection.recovery_key
+ connection.transition_state_machine! :failed
+ end
+ end
+
+ connection.on(:failed) do
+ recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: recovery_key))
+ recover_client_channel = recover_client.channel(channel_name)
+ expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial)
+
+ recover_client.connection.once(:connecting) do
+ recover_client_channel.publish('event', 'message-2')
+ expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial + 1)
+ end
+
+ recover_client_channel.attach do
+ expect(recover_client.connection.id).to eql(connection_id)
+
+ recover_client_channel.subscribe do |message|
+ raise "Unexpected message #{message}" if message.data != 'message-2'
+ EventMachine.add_timer(2) do
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ end
+ end
end
context 'with :recover option' do
context 'with invalid syntax' do
let(:invaid_client_options) { default_options.merge(recover: 'invalid') }
@@ -1248,11 +1350,11 @@
stop_reactor
end
end
context 'with invalid formatted value sent to server' do
- let(:client_options) { default_options.merge(recover: 'not-a-valid-connection-key:1', log_level: :none) }
+ let(:client_options) { default_options.merge(recover: 'not-a-valid-connection-key:1:0', log_level: :none) }
it 'sets the #error_reason and moves the connection to FAILED' do
connection.once(:failed) do |state_change|
expect(connection.state).to eq(:failed)
expect(state_change.reason.message).to match(/Invalid connectionKey/i)
@@ -1263,10 +1365,10 @@
end
end
end
context 'with expired (missing) value sent to server' do
- let(:client_options) { default_options.merge(recover: 'wVIsgTHAB1UvXh7z-1991d8586:0', log_level: :fatal) }
+ let(:client_options) { default_options.merge(recover: 'wVIsgTHAB1UvXh7z-1991d8586:0:0', log_level: :fatal) }
it 'connects but sets the error reason and includes the reason in the state change' do
connection.once(:connected) do |state_change|
expect(connection.state).to eq(:connected)
expect(state_change.reason.message).to match(/Unable to recover connection/i)