spec/acceptance/realtime/connection_spec.rb in ably-1.2.4 vs spec/acceptance/realtime/connection_spec.rb in ably-1.2.6

- old
+ new

@@ -100,10 +100,11 @@ # Authorize synchronously to ensure token has been issued client.auth.authorize_sync(ttl: ttl) end let(:ttl) { 2 } + let(:clock_skew) { 0.1 } # 0.1 second clock skew it 'renews token every time after it expires' do started_at = Time.now.to_f connected_times = 0 disconnected_times = 0 @@ -112,12 +113,12 @@ end connection.on(:disconnected) do disconnected_times += 1 if disconnected_times == 3 expect(connected_times).to eql(3) - expect(Time.now.to_f - started_at).to be > ttl * 3 - expect(Time.now.to_f - started_at).to be < (ttl * 2) * 3 + expect((Time.now.to_f - started_at) + clock_skew).to be > ttl * 3 + expect((Time.now.to_f - started_at) - clock_skew).to be < (ttl * 2) * 3 stop_reactor end end end end @@ -384,11 +385,11 @@ it 'fails the connection' do expect(client.client_id).to eql('incompatible') client.connection.once(:failed) do expect(client.client_id).to eql('incompatible') - expect(client.connection.error_reason.code).to eql(40101) # Invalid clientId for credentials + expect(client.connection.error_reason.code).to eql(40102) # Incompatible clientId for credentials stop_reactor end end end end @@ -741,62 +742,10 @@ end end end end - describe '#serial connection serial' do - let(:channel) { client.channel(random_str) } - - it 'is set to -1 when a new connection is opened' do - connection.connect do - expect(connection.serial).to eql(-1) - stop_reactor - end - end - - context 'when a message is sent but the ACK has not yet been received' do - it 'the sent message msgSerial is 0 but the connection serial remains at -1' do - channel.attach do - connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| - if protocol_message.action == :message - connection.__outgoing_protocol_msgbus__.unsubscribe - expect(protocol_message['msgSerial']).to eql(0) - expect(connection.serial).to eql(-1) - stop_reactor - end - end - channel.publish('event', 'data') - end - end - end - - it 'is set to 0 when a message is received back' do - channel.publish('event', 'data') - channel.subscribe do - expect(connection.serial).to eql(0) - stop_reactor - end - end - - it 'is set to 1 when the second message is received' do - channel.attach do - messages = [] - channel.subscribe do |message| - messages << message - if messages.length == 2 - expect(connection.serial).to eql(1) - stop_reactor - end - end - - channel.publish('event', 'data') do - channel.publish('event', 'data') - end - end - end - end - describe '#msgSerial' do context 'when messages are queued for publish before a connection is established' do let(:batches) { 6 } let(:messages_per_batch) { 10 } @@ -919,11 +868,10 @@ context ":connected arrive when trying to close" do let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { max_idle_interval: 2 * 1000 } } end @@ -1229,11 +1177,10 @@ context 'heartbeat interval' do context 'when reduced artificially' do let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { max_idle_interval: 2 * 1000 } } end @@ -1400,38 +1347,10 @@ end let(:available_states) { self.class.available_states } let(:states) { Hash.new } let(:channel) { client.channel(random_str) } - it 'is composed of connection key and serial that is kept up to date with each message ACK received' do - connection.on(:connected) do - expected_serial = -1 - expect(connection.key).to_not be_nil - expect(connection.serial).to eql(expected_serial) - - channel.attach do - channel.publish('event', 'data') - channel.subscribe do - channel.unsubscribe - - expected_serial += 1 # attach message received - expect(connection.serial).to eql(expected_serial) - - channel.publish('event', 'data') - channel.subscribe do - channel.unsubscribe - expected_serial += 1 # attach message received - expect(connection.serial).to eql(expected_serial) - - expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}:#{connection.send(:client_msg_serial)}") - stop_reactor - end - end - end - end - end - it "is available when connection is in one of the states: #{available_states.join(', ')}" do connection.once(:connected) do allow(client).to receive(:endpoint).and_return( URI::Generic.build( scheme: 'wss', @@ -1443,11 +1362,11 @@ connection.transition_state_machine! :disconnected end available_states.each do |state| connection.on(state) do - states[state.to_sym] = true if connection.recovery_key + states[state.to_sym] = true if connection.create_recovery_key end end connection.once(:suspended) do error_message = Ably::Models::ProtocolMessage.new(action: 9, error: { message: 'force failure' }) @@ -1461,61 +1380,47 @@ end it 'is nil when connection is explicitly CLOSED' do connection.once(:connected) do connection.close do - expect(connection.recovery_key).to be_nil + expect(connection.create_recovery_key).to be_nil stop_reactor end end end end context "opening a new connection using a recently disconnected connection's #recovery_key" do context 'connection#id after recovery' do it 'remains the same' do previous_connection_id = nil + recovery_key = nil connection.once(:connected) do previous_connection_id = connection.id + recovery_key = client.connection.create_recovery_key connection.transition_state_machine! :failed end connection.once(:failed) do - recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: client.connection.recovery_key)) + recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: recovery_key)) recover_client.connection.on(:connected) do expect(recover_client.connection.id).to eql(previous_connection_id) stop_reactor end end end - - it 'does not call a resume callback', api_private: true do - connection.once(:connected) do - connection.transition_state_machine! :failed - end - - connection.once(:failed) do - recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: client.connection.recovery_key)) - recover_client.connection.on_resume do - raise 'Should not call the resume callback' - end - recover_client.connection.on(:connected) do - EventMachine.add_timer(0.5) { stop_reactor } - end - end - end end context 'when messages have been sent whilst the old connection is disconnected' do describe 'the new connection' do it 'recovers server-side queued messages' do connection_id, recovery_key = nil, nil channel.attach do connection_id = client.connection.id - recovery_key = client.connection.recovery_key + recovery_key = client.connection.create_recovery_key connection.transport.__incoming_protocol_msgbus__ publishing_client_channel.publish('event', 'message') do connection.transition_state_machine! :failed end end @@ -1545,11 +1450,11 @@ connection_id = client.connection.id connection.transport.__incoming_protocol_msgbus__ channel.publish('event', 'message') do msg_serial = connection.send(:client_msg_serial) expect(msg_serial).to eql(0) - recovery_key = client.connection.recovery_key + recovery_key = client.connection.create_recovery_key connection.transition_state_machine! :failed end end connection.on(:failed) do @@ -1576,11 +1481,11 @@ connection.transport.__incoming_protocol_msgbus__ channel.subscribe('event') do |message| expect(message.data).to eql('message-1') msg_serial = connection.send(:client_msg_serial) expect(msg_serial).to eql(0) - recovery_key = client.connection.recovery_key + recovery_key = client.connection.create_recovery_key connection.transition_state_machine! :failed end channel.publish('event', 'message-1') end @@ -1610,27 +1515,33 @@ end end context 'with :recover option' do context 'with invalid syntax' do - let(:invaid_client_options) { default_options.merge(recover: 'invalid') } + let(:client_options) { default_options.merge(recover: 'invalid') } - it 'raises an exception' do - expect { Ably::Realtime::Client.new(invaid_client_options) }.to raise_error ArgumentError, /Recover/ - stop_reactor + it 'logs recovery decode error as a warning and connects successfully' do + connection.once(:connected) do + EventMachine.add_timer(1) { stop_reactor } + end + expect(client.logger).to receive(:warn).at_least(:once) do |*args, &block| + expect(args.concat([block ? block.call : nil]).join(',')).to match(/unable to decode recovery key/) + end end end - context 'with expired (missing) value sent to server' do - let(:client_options) { default_options.merge(recover: 'wVIsgTHAB1UvXh7z-1991d8586:0:0', log_level: :fatal) } + context 'with invalid connection key' do + recovery_key = "{\"connection_key\":\"0123456789abcdef-99\",\"msg_serial\":2," << + "\"channel_serials\":{\"channel1\":\"serial1\",\"channel2\":\"serial2\"}}" + let(:client_options) { default_options.merge(recover: recovery_key, log_level: :fatal) } it 'connects but sets the error reason and includes the reason in the state change' do connection.once(:connected) do |state_change| expect(connection.state).to eq(:connected) - expect(state_change.reason.message).to match(/Unable to recover connection/i) - expect(connection.error_reason.message).to match(/Unable to recover connection/i) - expect(connection.error_reason.code).to eql(80008) + expect(state_change.reason.message).to match(/Invalid connection key/i) + expect(connection.error_reason.message).to match(/Invalid connection key/i) + expect(connection.error_reason.code).to eql(80018) expect(connection.error_reason).to eql(state_change.reason) stop_reactor end end end @@ -1700,27 +1611,27 @@ expect(connection.internet_up?).to be_a(EventMachine::Deferrable) stop_reactor end context 'internet up URL protocol' do - let(:http_request) { double('EventMachine::HttpRequest', get: EventMachine::DefaultDeferrable.new) } + let(:http_request) { double('EventMachine::AblyHttpRequest::HttpRequest', get: EventMachine::DefaultDeferrable.new) } context 'when using TLS for the connection' do let(:client_options) { default_options.merge(tls: true) } it 'uses TLS for the Internet check to https://internet-up.ably-realtime.com/is-the-internet-up.txt' do - expect(EventMachine::HttpRequest).to receive(:new).with('https://internet-up.ably-realtime.com/is-the-internet-up.txt', { tls: { verify_peer: true } }).and_return(http_request) + expect(EventMachine::AblyHttpRequest::HttpRequest).to receive(:new).with('https://internet-up.ably-realtime.com/is-the-internet-up.txt', { tls: { verify_peer: true } }).and_return(http_request) connection.internet_up? stop_reactor end end context 'when using a non-secured connection' do let(:client_options) { default_options.merge(tls: false, use_token_auth: true) } it 'uses TLS for the Internet check to http://internet-up.ably-realtime.com/is-the-internet-up.txt' do - expect(EventMachine::HttpRequest).to receive(:new).with('http://internet-up.ably-realtime.com/is-the-internet-up.txt', { tls: { verify_peer: true } }).and_return(http_request) + expect(EventMachine::AblyHttpRequest::HttpRequest).to receive(:new).with('http://internet-up.ably-realtime.com/is-the-internet-up.txt', { tls: { verify_peer: true } }).and_return(http_request) connection.internet_up? stop_reactor end end end @@ -1730,21 +1641,21 @@ context 'with a TLS connection' do let(:client_options) { default_options.merge(tls: true) } it 'checks the Internet up URL over TLS' do - expect(EventMachine::HttpRequest).to receive(:new).with("https:#{Ably::INTERNET_CHECK.fetch(:url)}", { tls: { verify_peer: true } }).and_return(double('request', get: EventMachine::DefaultDeferrable.new)) + expect(EventMachine::AblyHttpRequest::HttpRequest).to receive(:new).with("https:#{Ably::INTERNET_CHECK.fetch(:url)}", { tls: { verify_peer: true } }).and_return(double('request', get: EventMachine::DefaultDeferrable.new)) connection.internet_up? stop_reactor end end context 'with a non-TLS connection' do let(:client_options) { default_options.merge(tls: false, use_token_auth: true) } it 'checks the Internet up URL over TLS' do - expect(EventMachine::HttpRequest).to receive(:new).with("http:#{Ably::INTERNET_CHECK.fetch(:url)}", { tls: { verify_peer: true } }).and_return(double('request', get: EventMachine::DefaultDeferrable.new)) + expect(EventMachine::AblyHttpRequest::HttpRequest).to receive(:new).with("http:#{Ably::INTERNET_CHECK.fetch(:url)}", { tls: { verify_peer: true } }).and_return(double('request', get: EventMachine::DefaultDeferrable.new)) connection.internet_up? stop_reactor end end @@ -1998,11 +1909,10 @@ context 'when a CONNECTED message is received (#RTN24)' do let(:connection_key) { random_str(32) } let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { client_id: 'bob', connection_key: connection_key, connection_state_ttl: 33 * 1000, max_frame_size: 555, @@ -2034,11 +1944,10 @@ expect(client.auth.client_id).to eql('*') connection.once(:update) do |connection_state_change| expect(client.auth.client_id).to eql('bob') expect(connection.key).to eql(connection_key) - expect(connection.serial).to eql(55) expect(connection.connection_state_ttl).to eql(33) expect(connection.details.client_id).to eql('bob') expect(connection.details.connection_key).to eql(connection_key) expect(connection.details.connection_state_ttl).to eql(33) @@ -2057,11 +1966,10 @@ context 'when a CONNECTED message with an error is received' do let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 22, error: { code: 50000, message: 'Internal failure' }, } end it 'emits an UPDATE event' do @@ -2086,10 +1994,10 @@ context 'version params' do it 'sends the protocol version param v (#G4, #RTN2f)' do expect(EventMachine).to receive(:connect) do |host, port, transport, object, url| uri = URI.parse(url) - expect(CGI::parse(uri.query)['v'][0]).to eql('1.2') + expect(CGI::parse(uri.query)['v'][0]).to eql('2') stop_reactor end client end