spec/acceptance/realtime/connection_spec.rb in ably-1.2.4 vs spec/acceptance/realtime/connection_spec.rb in ably-1.2.6
- old
+ new
@@ -100,10 +100,11 @@
# Authorize synchronously to ensure token has been issued
client.auth.authorize_sync(ttl: ttl)
end
let(:ttl) { 2 }
+ let(:clock_skew) { 0.1 } # 0.1 second clock skew
it 'renews token every time after it expires' do
started_at = Time.now.to_f
connected_times = 0
disconnected_times = 0
@@ -112,12 +113,12 @@
end
connection.on(:disconnected) do
disconnected_times += 1
if disconnected_times == 3
expect(connected_times).to eql(3)
- expect(Time.now.to_f - started_at).to be > ttl * 3
- expect(Time.now.to_f - started_at).to be < (ttl * 2) * 3
+ expect((Time.now.to_f - started_at) + clock_skew).to be > ttl * 3
+ expect((Time.now.to_f - started_at) - clock_skew).to be < (ttl * 2) * 3
stop_reactor
end
end
end
end
@@ -384,11 +385,11 @@
it 'fails the connection' do
expect(client.client_id).to eql('incompatible')
client.connection.once(:failed) do
expect(client.client_id).to eql('incompatible')
- expect(client.connection.error_reason.code).to eql(40101) # Invalid clientId for credentials
+ expect(client.connection.error_reason.code).to eql(40102) # Incompatible clientId for credentials
stop_reactor
end
end
end
end
@@ -741,62 +742,10 @@
end
end
end
end
- describe '#serial connection serial' do
- let(:channel) { client.channel(random_str) }
-
- it 'is set to -1 when a new connection is opened' do
- connection.connect do
- expect(connection.serial).to eql(-1)
- stop_reactor
- end
- end
-
- context 'when a message is sent but the ACK has not yet been received' do
- it 'the sent message msgSerial is 0 but the connection serial remains at -1' do
- channel.attach do
- connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
- if protocol_message.action == :message
- connection.__outgoing_protocol_msgbus__.unsubscribe
- expect(protocol_message['msgSerial']).to eql(0)
- expect(connection.serial).to eql(-1)
- stop_reactor
- end
- end
- channel.publish('event', 'data')
- end
- end
- end
-
- it 'is set to 0 when a message is received back' do
- channel.publish('event', 'data')
- channel.subscribe do
- expect(connection.serial).to eql(0)
- stop_reactor
- end
- end
-
- it 'is set to 1 when the second message is received' do
- channel.attach do
- messages = []
- channel.subscribe do |message|
- messages << message
- if messages.length == 2
- expect(connection.serial).to eql(1)
- stop_reactor
- end
- end
-
- channel.publish('event', 'data') do
- channel.publish('event', 'data')
- end
- end
- end
- end
-
describe '#msgSerial' do
context 'when messages are queued for publish before a connection is established' do
let(:batches) { 6 }
let(:messages_per_batch) { 10 }
@@ -919,11 +868,10 @@
context ":connected arrive when trying to close" do
let(:protocol_message_attributes) do
{
action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i,
- connection_serial: 55,
connection_details: {
max_idle_interval: 2 * 1000
}
}
end
@@ -1229,11 +1177,10 @@
context 'heartbeat interval' do
context 'when reduced artificially' do
let(:protocol_message_attributes) do
{
action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i,
- connection_serial: 55,
connection_details: {
max_idle_interval: 2 * 1000
}
}
end
@@ -1400,38 +1347,10 @@
end
let(:available_states) { self.class.available_states }
let(:states) { Hash.new }
let(:channel) { client.channel(random_str) }
- it 'is composed of connection key and serial that is kept up to date with each message ACK received' do
- connection.on(:connected) do
- expected_serial = -1
- expect(connection.key).to_not be_nil
- expect(connection.serial).to eql(expected_serial)
-
- channel.attach do
- channel.publish('event', 'data')
- channel.subscribe do
- channel.unsubscribe
-
- expected_serial += 1 # attach message received
- expect(connection.serial).to eql(expected_serial)
-
- channel.publish('event', 'data')
- channel.subscribe do
- channel.unsubscribe
- expected_serial += 1 # attach message received
- expect(connection.serial).to eql(expected_serial)
-
- expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}:#{connection.send(:client_msg_serial)}")
- stop_reactor
- end
- end
- end
- end
- end
-
it "is available when connection is in one of the states: #{available_states.join(', ')}" do
connection.once(:connected) do
allow(client).to receive(:endpoint).and_return(
URI::Generic.build(
scheme: 'wss',
@@ -1443,11 +1362,11 @@
connection.transition_state_machine! :disconnected
end
available_states.each do |state|
connection.on(state) do
- states[state.to_sym] = true if connection.recovery_key
+ states[state.to_sym] = true if connection.create_recovery_key
end
end
connection.once(:suspended) do
error_message = Ably::Models::ProtocolMessage.new(action: 9, error: { message: 'force failure' })
@@ -1461,61 +1380,47 @@
end
it 'is nil when connection is explicitly CLOSED' do
connection.once(:connected) do
connection.close do
- expect(connection.recovery_key).to be_nil
+ expect(connection.create_recovery_key).to be_nil
stop_reactor
end
end
end
end
context "opening a new connection using a recently disconnected connection's #recovery_key" do
context 'connection#id after recovery' do
it 'remains the same' do
previous_connection_id = nil
+ recovery_key = nil
connection.once(:connected) do
previous_connection_id = connection.id
+ recovery_key = client.connection.create_recovery_key
connection.transition_state_machine! :failed
end
connection.once(:failed) do
- recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: client.connection.recovery_key))
+ recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: recovery_key))
recover_client.connection.on(:connected) do
expect(recover_client.connection.id).to eql(previous_connection_id)
stop_reactor
end
end
end
-
- it 'does not call a resume callback', api_private: true do
- connection.once(:connected) do
- connection.transition_state_machine! :failed
- end
-
- connection.once(:failed) do
- recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: client.connection.recovery_key))
- recover_client.connection.on_resume do
- raise 'Should not call the resume callback'
- end
- recover_client.connection.on(:connected) do
- EventMachine.add_timer(0.5) { stop_reactor }
- end
- end
- end
end
context 'when messages have been sent whilst the old connection is disconnected' do
describe 'the new connection' do
it 'recovers server-side queued messages' do
connection_id, recovery_key = nil, nil
channel.attach do
connection_id = client.connection.id
- recovery_key = client.connection.recovery_key
+ recovery_key = client.connection.create_recovery_key
connection.transport.__incoming_protocol_msgbus__
publishing_client_channel.publish('event', 'message') do
connection.transition_state_machine! :failed
end
end
@@ -1545,11 +1450,11 @@
connection_id = client.connection.id
connection.transport.__incoming_protocol_msgbus__
channel.publish('event', 'message') do
msg_serial = connection.send(:client_msg_serial)
expect(msg_serial).to eql(0)
- recovery_key = client.connection.recovery_key
+ recovery_key = client.connection.create_recovery_key
connection.transition_state_machine! :failed
end
end
connection.on(:failed) do
@@ -1576,11 +1481,11 @@
connection.transport.__incoming_protocol_msgbus__
channel.subscribe('event') do |message|
expect(message.data).to eql('message-1')
msg_serial = connection.send(:client_msg_serial)
expect(msg_serial).to eql(0)
- recovery_key = client.connection.recovery_key
+ recovery_key = client.connection.create_recovery_key
connection.transition_state_machine! :failed
end
channel.publish('event', 'message-1')
end
@@ -1610,27 +1515,33 @@
end
end
context 'with :recover option' do
context 'with invalid syntax' do
- let(:invaid_client_options) { default_options.merge(recover: 'invalid') }
+ let(:client_options) { default_options.merge(recover: 'invalid') }
- it 'raises an exception' do
- expect { Ably::Realtime::Client.new(invaid_client_options) }.to raise_error ArgumentError, /Recover/
- stop_reactor
+ it 'logs recovery decode error as a warning and connects successfully' do
+ connection.once(:connected) do
+ EventMachine.add_timer(1) { stop_reactor }
+ end
+ expect(client.logger).to receive(:warn).at_least(:once) do |*args, &block|
+ expect(args.concat([block ? block.call : nil]).join(',')).to match(/unable to decode recovery key/)
+ end
end
end
- context 'with expired (missing) value sent to server' do
- let(:client_options) { default_options.merge(recover: 'wVIsgTHAB1UvXh7z-1991d8586:0:0', log_level: :fatal) }
+ context 'with invalid connection key' do
+ recovery_key = "{\"connection_key\":\"0123456789abcdef-99\",\"msg_serial\":2," <<
+ "\"channel_serials\":{\"channel1\":\"serial1\",\"channel2\":\"serial2\"}}"
+ let(:client_options) { default_options.merge(recover: recovery_key, log_level: :fatal) }
it 'connects but sets the error reason and includes the reason in the state change' do
connection.once(:connected) do |state_change|
expect(connection.state).to eq(:connected)
- expect(state_change.reason.message).to match(/Unable to recover connection/i)
- expect(connection.error_reason.message).to match(/Unable to recover connection/i)
- expect(connection.error_reason.code).to eql(80008)
+ expect(state_change.reason.message).to match(/Invalid connection key/i)
+ expect(connection.error_reason.message).to match(/Invalid connection key/i)
+ expect(connection.error_reason.code).to eql(80018)
expect(connection.error_reason).to eql(state_change.reason)
stop_reactor
end
end
end
@@ -1700,27 +1611,27 @@
expect(connection.internet_up?).to be_a(EventMachine::Deferrable)
stop_reactor
end
context 'internet up URL protocol' do
- let(:http_request) { double('EventMachine::HttpRequest', get: EventMachine::DefaultDeferrable.new) }
+ let(:http_request) { double('EventMachine::AblyHttpRequest::HttpRequest', get: EventMachine::DefaultDeferrable.new) }
context 'when using TLS for the connection' do
let(:client_options) { default_options.merge(tls: true) }
it 'uses TLS for the Internet check to https://internet-up.ably-realtime.com/is-the-internet-up.txt' do
- expect(EventMachine::HttpRequest).to receive(:new).with('https://internet-up.ably-realtime.com/is-the-internet-up.txt', { tls: { verify_peer: true } }).and_return(http_request)
+ expect(EventMachine::AblyHttpRequest::HttpRequest).to receive(:new).with('https://internet-up.ably-realtime.com/is-the-internet-up.txt', { tls: { verify_peer: true } }).and_return(http_request)
connection.internet_up?
stop_reactor
end
end
context 'when using a non-secured connection' do
let(:client_options) { default_options.merge(tls: false, use_token_auth: true) }
it 'uses TLS for the Internet check to http://internet-up.ably-realtime.com/is-the-internet-up.txt' do
- expect(EventMachine::HttpRequest).to receive(:new).with('http://internet-up.ably-realtime.com/is-the-internet-up.txt', { tls: { verify_peer: true } }).and_return(http_request)
+ expect(EventMachine::AblyHttpRequest::HttpRequest).to receive(:new).with('http://internet-up.ably-realtime.com/is-the-internet-up.txt', { tls: { verify_peer: true } }).and_return(http_request)
connection.internet_up?
stop_reactor
end
end
end
@@ -1730,21 +1641,21 @@
context 'with a TLS connection' do
let(:client_options) { default_options.merge(tls: true) }
it 'checks the Internet up URL over TLS' do
- expect(EventMachine::HttpRequest).to receive(:new).with("https:#{Ably::INTERNET_CHECK.fetch(:url)}", { tls: { verify_peer: true } }).and_return(double('request', get: EventMachine::DefaultDeferrable.new))
+ expect(EventMachine::AblyHttpRequest::HttpRequest).to receive(:new).with("https:#{Ably::INTERNET_CHECK.fetch(:url)}", { tls: { verify_peer: true } }).and_return(double('request', get: EventMachine::DefaultDeferrable.new))
connection.internet_up?
stop_reactor
end
end
context 'with a non-TLS connection' do
let(:client_options) { default_options.merge(tls: false, use_token_auth: true) }
it 'checks the Internet up URL over TLS' do
- expect(EventMachine::HttpRequest).to receive(:new).with("http:#{Ably::INTERNET_CHECK.fetch(:url)}", { tls: { verify_peer: true } }).and_return(double('request', get: EventMachine::DefaultDeferrable.new))
+ expect(EventMachine::AblyHttpRequest::HttpRequest).to receive(:new).with("http:#{Ably::INTERNET_CHECK.fetch(:url)}", { tls: { verify_peer: true } }).and_return(double('request', get: EventMachine::DefaultDeferrable.new))
connection.internet_up?
stop_reactor
end
end
@@ -1998,11 +1909,10 @@
context 'when a CONNECTED message is received (#RTN24)' do
let(:connection_key) { random_str(32) }
let(:protocol_message_attributes) do
{
action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i,
- connection_serial: 55,
connection_details: {
client_id: 'bob',
connection_key: connection_key,
connection_state_ttl: 33 * 1000,
max_frame_size: 555,
@@ -2034,11 +1944,10 @@
expect(client.auth.client_id).to eql('*')
connection.once(:update) do |connection_state_change|
expect(client.auth.client_id).to eql('bob')
expect(connection.key).to eql(connection_key)
- expect(connection.serial).to eql(55)
expect(connection.connection_state_ttl).to eql(33)
expect(connection.details.client_id).to eql('bob')
expect(connection.details.connection_key).to eql(connection_key)
expect(connection.details.connection_state_ttl).to eql(33)
@@ -2057,11 +1966,10 @@
context 'when a CONNECTED message with an error is received' do
let(:protocol_message_attributes) do
{
action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i,
- connection_serial: 22,
error: { code: 50000, message: 'Internal failure' },
}
end
it 'emits an UPDATE event' do
@@ -2086,10 +1994,10 @@
context 'version params' do
it 'sends the protocol version param v (#G4, #RTN2f)' do
expect(EventMachine).to receive(:connect) do |host, port, transport, object, url|
uri = URI.parse(url)
- expect(CGI::parse(uri.query)['v'][0]).to eql('1.2')
+ expect(CGI::parse(uri.query)['v'][0]).to eql('2')
stop_reactor
end
client
end