spec/acceptance/realtime/connection_spec.rb in ably-0.6.2 vs spec/acceptance/realtime/connection_spec.rb in ably-0.7.0

- old
+ new

@@ -1,181 +1,655 @@ +# encoding: utf-8 require 'spec_helper' -describe Ably::Realtime::Connection do - include RSpec::EventMachine - +describe Ably::Realtime::Connection, :event_machine do let(:connection) { client.connection } - [:json, :msgpack].each do |protocol| - context "over #{protocol}" do - let(:default_options) do - { api_key: api_key, environment: environment, protocol: protocol } + vary_by_protocol do + let(:default_options) do + { api_key: api_key, environment: environment, protocol: protocol } + end + + let(:client_options) { default_options } + let(:client) { Ably::Realtime::Client.new(client_options) } + + before(:example) do + EventMachine.add_shutdown_hook do + connection.off # minimise side effects of callbacks from finished test calling stop_reactor end + end - let(:client) do - Ably::Realtime::Client.new(default_options) + context 'intialization' do + it 'connects automatically' do + connection.on(:connected) do + expect(connection.state).to eq(:connected) + stop_reactor + end end - context 'with API key' do - it 'connects automatically' do - run_reactor do - connection.on(:connected) do - expect(connection.state).to eq(:connected) - expect(client.auth.auth_params[:key_id]).to_not be_nil - expect(client.auth.auth_params[:access_token]).to be_nil - stop_reactor - end + context 'with :connect_automatically option set to false' do + let(:client) do + Ably::Realtime::Client.new(default_options.merge(connect_automatically: false)) + end + + it 'does not connect automatically' do + EventMachine.add_timer(1) do + expect(connection).to be_initialized + stop_reactor end + client end + + it 'connects when method #connect is called' do + connection.connect do + expect(connection).to be_connected + stop_reactor + end + end end - context 'with client_id resulting in token auth' do - let(:default_options) do - { api_key: api_key, environment: environment, protocol: protocol, client_id: SecureRandom.hex, log_level: :debug } + context 'with token auth' do + before do + # Reduce token expiry buffer to zero so that a token expired? predicate is exact + # Normally there is a buffer so that a token expiring soon is considered expired + stub_const 'Ably::Models::Token::TOKEN_EXPIRY_BUFFER', 0 end - it 'connects automatically' do - run_reactor do - connection.on(:connected) do - expect(connection.state).to eq(:connected) - expect(client.auth.auth_params[:access_token]).to_not be_nil - expect(client.auth.auth_params[:key_id]).to be_nil - stop_reactor + + context 'for renewable tokens' do + context 'that are valid for the duration of the test' do + context 'with valid pre authorised token expiring in the future' do + it 'uses the existing token created by Auth' do + client.auth.authorise(ttl: 300) + expect(client.auth).to_not receive(:request_token) + connection.once(:connected) do + stop_reactor + end + end end + + context 'with implicit authorisation' do + let(:client_options) { default_options.merge(client_id: 'force_token_auth') } + + it 'uses the token created by the implicit authorisation' do + expect(client.auth).to receive(:request_token).once.and_call_original + + connection.once(:connected) do + stop_reactor + end + end + end end - end - end - context 'initialization phases' do - let(:phases) { [:initialized, :connecting, :connected] } - let(:events_triggered) { [] } + context 'that expire' do + let(:client_options) { default_options.merge(log_level: :none) } - it 'are triggered in order' do - test_expectation = Proc.new do - expect(events_triggered).to eq(phases) - stop_reactor + before do + client.auth.authorise(ttl: ttl) + end + + context 'opening a new connection' do + context 'with recently expired token' do + let(:ttl) { 2 } + + it 'renews the token on connect' do + sleep ttl + 0.1 + expect(client.auth.current_token).to be_expired + expect(client.auth).to receive(:authorise).once.and_call_original + connection.once(:connected) do + expect(client.auth.current_token).to_not be_expired + stop_reactor + end + end + end + + context 'with immediately expiring token' do + let(:ttl) { 0.01 } + + it 'renews the token on connect, and only makes one subsequent attempt to obtain a new token' do + expect(client.auth).to receive(:authorise).twice.and_call_original + connection.once(:disconnected) do + connection.once(:failed) do |error| + expect(error.code).to eql(40140) # token expired + stop_reactor + end + end + end + + it 'uses the primary host for subsequent connection and auth requests' do + EventMachine.add_timer(1) do # wait for token to expire + connection.once(:disconnected) do + expect(client.rest_client.connection).to receive(:post).with(/requestToken$/, anything).and_call_original + + expect(client.rest_client).to_not receive(:fallback_connection) + expect(client).to_not receive(:fallback_endpoint) + + connection.once(:failed) do + connection.off + stop_reactor + end + end + end + end + end + end + + context 'when connected with a valid non-expired token' do + context 'that then expires following the connection being opened' do + let(:ttl) { 2 } + let(:channel) { client.channel('test') } + + context 'the server' do + it 'disconnects the client, and the client automatically renews the token and then reconnects', em_timeout: 10 do + expect(client.auth.current_token).to_not be_expired + + channel.attach + original_token = client.auth.current_token + + connection.once(:connected) do + started_at = Time.now + connection.once(:disconnected) do |error| + expect(Time.now - started_at >= ttl) + expect(original_token).to be_expired + expect(error.code).to eql(40140) # token expired + connection.once(:connected) do + expect(client.auth.current_token).to_not be_expired + stop_reactor + end + end + end + end + end + + skip 'retains connection state' + skip 'changes state to failed if a new token cannot be issued' + end + end end + end - run_reactor do - phases.each do |phase| - connection.on(phase) do - events_triggered << phase - test_expectation.call if events_triggered.length == phases.length + context 'for non-renewable tokens' do + context 'that are expired' do + let!(:expired_token) do + Ably::Realtime::Client.new(default_options).auth.request_token(ttl: 0.01) + end + + context 'opening a new connection' do + let(:client_options) { default_options.merge(api_key: nil, token_id: expired_token.id, log_level: :none) } + + it 'transitions state to failed', em_timeout: 10 do + EventMachine.add_timer(1) do # wait for token to expire + expect(expired_token).to be_expired + connection.once(:connected) { raise 'Connection should never connect as token has expired' } + connection.once(:failed) do + expect(client.connection.error_reason.code).to eql(40140) + stop_reactor + end + end end end + + context 'when connected' do + skip 'transitions state to failed' + end end end end - skip '#close disconnects, closes the connection immediately and changes the connection state to closed' + end - specify '#close(graceful: true) gracefully waits for the server to close the connection' do - run_reactor(8) do - connection.close - connection.on(:closed) do - expect(connection.state).to eq(:closed) - stop_reactor + context 'initialization state changes' do + let(:phases) { [:connecting, :connected] } + let(:events_triggered) { [] } + let(:test_expectation) do + Proc.new do + expect(events_triggered).to eq(phases) + stop_reactor + end + end + + def expect_ordered_phases + phases.each do |phase| + connection.on(phase) do + events_triggered << phase + test_expectation.call if events_triggered.length == phases.length end end end - it 'echoes a heart beat with #ping' do - run_reactor do - connection.on(:connected) do - connection.ping do |time_elapsed| - expect(time_elapsed).to be > 0 + context 'with implicit #connect' do + it 'are triggered in order' do + expect_ordered_phases + end + end + + context 'with explicit #connect' do + it 'are triggered in order' do + expect_ordered_phases + connection.connect + end + end + end + + context '#connect' do + it 'returns a Deferrable' do + expect(connection.connect).to be_a(EventMachine::Deferrable) + stop_reactor + end + + it 'calls the Deferrable callback on success' do + connection.connect.callback do |connection| + expect(connection).to be_a(Ably::Realtime::Connection) + expect(connection.state).to eq(:connected) + stop_reactor + end + end + + context 'when already connected' do + it 'does nothing and no further state changes are emitted' do + connection.once(:connected) do + connection.once_state_changed { raise 'State should not have changed' } + 3.times { connection.connect } + EventMachine.add_timer(1) do + expect(connection).to be_connected + connection.off stop_reactor end end end end - skip 'connects, closes gracefully and reconnects on #connect' + describe 'once connected' do + let(:connection2) { Ably::Realtime::Client.new(client_options).connection } - it 'connects, closes the connection, and then reconnects with a new connection ID' do - run_reactor(15) do + describe 'connection#id' do + it 'is a string' do + connection.connect do + expect(connection.id).to be_a(String) + stop_reactor + end + end + + it 'is unique from the connection#key' do + connection.connect do + expect(connection.id).to_not eql(connection.key) + stop_reactor + end + end + + it 'is unique for every connection' do + when_all(connection.connect, connection2.connect) do + expect(connection.id).to_not eql(connection2.id) + stop_reactor + end + end + end + + describe 'connection#key' do + it 'is a string' do + connection.connect do + expect(connection.key).to be_a(String) + stop_reactor + end + end + + it 'is unique from the connection#id' do + connection.connect do + expect(connection.key).to_not eql(connection.id) + stop_reactor + end + end + + it 'is unique for every connection' do + when_all(connection.connect, connection2.connect) do + expect(connection.key).to_not eql(connection2.key) + stop_reactor + end + end + end + end + + context 'following a previous connection being opened and closed' do + it 'reconnects and is provided with a new connection ID and connection key from the server' do connection.connect do - connection_id = connection.id + connection_id = connection.id + connection_key = connection.key + connection.close do connection.connect do expect(connection.id).to_not eql(connection_id) + expect(connection.key).to_not eql(connection_key) stop_reactor end end end end end + end - context 'failures' do - context 'with invalid app part of the key' do - let(:missing_key) { 'not_an_app.invalid_key_id:invalid_key_value' } - let(:client) do - Ably::Realtime::Client.new(default_options.merge(api_key: missing_key)) + context '#close' do + it 'returns a Deferrable' do + connection.connect do + expect(connection.close).to be_a(EventMachine::Deferrable) + stop_reactor + end + end + + it 'calls the Deferrable callback on success' do + connection.connect do + connection.close.callback do |connection| + expect(connection).to be_a(Ably::Realtime::Connection) + expect(connection.state).to eq(:closed) + stop_reactor end + end + end - it 'enters the failed state and returns a not found error' do - run_reactor do - connection.on(:failed) do |error| - expect(connection.state).to eq(:failed) - expect(error.status).to eq(404) + context 'when already closed' do + it 'does nothing and no further state changes are emitted' do + connection.once(:connected) do + connection.close do + connection.once_state_changed { raise 'State should not have changed' } + 3.times { connection.close } + EventMachine.add_timer(1) do + expect(connection).to be_closed + connection.off stop_reactor end end end end + end - context 'with invalid key ID part of the key' do - let(:invalid_key) { "#{app_id}.invalid_key_id:invalid_key_value" } - let(:client) do - Ably::Realtime::Client.new(default_options.merge(api_key: invalid_key)) + context 'when connection state is' do + let(:events) { Hash.new } + + def log_connection_changes + connection.on(:closing) { events[:closing_emitted] = true } + connection.on(:error) { events[:error_emitted] = true } + + connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + events[:closed_message_from_server_received] = true if protocol_message.action == :closed end + end - it 'enters the failed state and returns an authorization error' do - run_reactor do - connection.on(:failed) do |error| - expect(connection.state).to eq(:failed) - expect(error.status).to eq(401) + context ':initialized' do + it 'changes the connection state to :closing and then immediately :closed without sending a ProtocolMessage CLOSE' do + connection.on(:closed) do + expect(connection.state).to eq(:closed) + + EventMachine.add_timer(1) do # allow for all subscribers on incoming message bes + expect(events[:error_emitted]).to_not eql(true) + expect(events[:closed_message_from_server_received]).to_not eql(true) + expect(events[:closing_emitted]).to eql(true) stop_reactor end end + + log_connection_changes + connection.close end end - context 'with invalid WebSocket host' do - let(:client) do - Ably::Realtime::Client.new(default_options.merge(ws_host: 'non.existent.host')) + context ':connected' do + it 'changes the connection state to :closing and waits for the server to confirm connection is :closed with a ProtocolMessage' do + connection.on(:connected) do + connection.on(:closed) do + EventMachine.add_timer(1) do # allow for all subscribers on incoming message bus + expect(events[:error_emitted]).to_not eql(true) + expect(events[:closed_message_from_server_received]).to eql(true) + expect(events[:closing_emitted]).to eql(true) + stop_reactor + end + end + + log_connection_changes + connection.close + end end - it 'enters the failed state and returns an authorization error' do - run_reactor do - connection.on(:failed) do |error| - expect(connection.state).to eq(:failed) - expect(error.code).to eq(80000) - expect(error.status).to be_nil - stop_reactor + context 'with an unresponsive connection' do + let(:stubbed_timeout) { 2 } + + before do + stub_const 'Ably::Realtime::Connection::ConnectionManager::TIMEOUTS', + Ably::Realtime::Connection::ConnectionManager::TIMEOUTS.merge(close: stubbed_timeout) + + connection.on(:connected) do + # Prevent all incoming & outgoing ProtocolMessages from being processed by the client library + connection.__outgoing_protocol_msgbus__.unsubscribe + connection.__incoming_protocol_msgbus__.unsubscribe end end + + it 'force closes the connection when a :closed ProtocolMessage response is not received' do + connection.on(:connected) do + close_requested_at = Time.now + + connection.on(:closed) do + expect(Time.now - close_requested_at).to be >= stubbed_timeout + expect(connection.state).to eq(:closed) + expect(events[:error_emitted]).to_not eql(true) + expect(events[:closed_message_from_server_received]).to_not eql(true) + expect(events[:closing_emitted]).to eql(true) + stop_reactor + end + + log_connection_changes + connection.close + end + end end end end + end - it 'opens many connections simultaneously' do - run_reactor(15) do - count, connected_ids = 25, [] + context '#ping' do + it 'echoes a heart beat' do + connection.on(:connected) do + connection.ping do |time_elapsed| + expect(time_elapsed).to be > 0 + stop_reactor + end + end + end - clients = count.times.map do - Ably::Realtime::Client.new(default_options) + context 'when not connected' do + it 'raises an exception' do + expect { connection.ping }.to raise_error RuntimeError, /Cannot send a ping when connection/ + stop_reactor + end + end + end + + context 'recovery' do + let(:channel_name) { random_str } + let(:channel) { client.channel(channel_name) } + let(:publishing_client) do + Ably::Realtime::Client.new(client_options) + end + let(:publishing_client_channel) { publishing_client.channel(channel_name) } + let(:client_options) { default_options.merge(log_level: :fatal) } + + before do + # Reconfigure client library retry periods and timeouts so that tests run quickly + stub_const 'Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG', + Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG.merge( + disconnected: { retry_every: 0.1, max_time_in_state: 0.2 }, + suspended: { retry_every: 0.1, max_time_in_state: 0.2 }, + ) + end + + describe '#recovery_key' do + def self.available_states + [:connecting, :connected, :disconnected, :suspended, :failed] + end + let(:available_states) { self.class.available_states} + let(:states) { Hash.new } + let(:client_options) { default_options.merge(log_level: :none) } + + it 'is composed of connection id and serial that is kept up to date with each message sent' do + connection.on(:connected) do + expected_serial = -1 + expect(connection.id).to_not be_nil + expect(connection.serial).to eql(expected_serial) + + client.channel('test').attach do |channel| + channel.publish('event', 'data') do + expected_serial += 1 # attach message received + expect(connection.serial).to eql(expected_serial) + + channel.publish('event', 'data') do + expected_serial += 1 # attach message received + expect(connection.serial).to eql(expected_serial) + stop_reactor + end + end + end end + end - clients.each do |client| - client.connection.on(:connected) do - connected_ids << client.connection.id + it "is available when connection is in one of the states: #{available_states.join(', ')}" do + connection.once(:connected) do + allow(client).to receive(:endpoint).and_return( + URI::Generic.build( + scheme: 'wss', + host: 'this.host.does.not.exist.com' + ) + ) - if connected_ids.count == 25 - expect(connected_ids.uniq.count).to eql(25) + connection.transition_state_machine! :disconnected + end + + available_states.each do |state| + connection.on(state) do + states[state.to_sym] = true if connection.recovery_key + end + end + + connection.once(:failed) do + expect(states.keys).to match_array(available_states) + stop_reactor + end + end + + it 'is nil when connection is explicitly CLOSED' do + connection.once(:connected) do + connection.close do + expect(connection.recovery_key).to be_nil + stop_reactor + end + end + end + end + + context "opening a new connection using a recently disconnected connection's #recovery_key" do + context 'connection#id and connection#key after recovery' do + let(:client_options) { default_options.merge(log_level: :none) } + + it 'remain the same' do + previous_connection_id = nil + previous_connection_key = nil + + connection.once(:connected) do + previous_connection_id = connection.id + previous_connection_key = connection.key + connection.transition_state_machine! :failed + end + + connection.once(:failed) do + recover_client = Ably::Realtime::Client.new(default_options.merge(recover: client.connection.recovery_key)) + recover_client.connection.on(:connected) do + expect(recover_client.connection.key).to eql(previous_connection_key) + expect(recover_client.connection.id).to eql(previous_connection_id) stop_reactor end end end + end + + context 'when messages have been sent whilst the old connection is disconnected' do + describe 'the new connection' do + let(:client_options) { default_options.merge(log_level: :none) } + + it 'recovers server-side queued messages' do + channel.attach do |message| + connection.transition_state_machine! :failed + end + + connection.on(:failed) do + publishing_client_channel.publish('event', 'message') do + recover_client = Ably::Realtime::Client.new(default_options.merge(recover: client.connection.recovery_key)) + recover_client.channel(channel_name).attach do |recover_client_channel| + recover_client_channel.subscribe('event') do |message| + expect(message.data).to eql('message') + stop_reactor + end + end + end + end + end + end + end + end + + context 'with :recover option' do + context 'with invalid syntax' do + let(:invaid_client_options) { default_options.merge(recover: 'invalid') } + + it 'raises an exception' do + expect { Ably::Realtime::Client.new(invaid_client_options) }.to raise_error ArgumentError, /Recover/ + stop_reactor + end + end + + context 'with invalid value' do + let(:client_options) { default_options.merge(recover: 'invalid:key', log_level: :fatal) } + + skip 'triggers an error on the connection object, sets the #error_reason and connects anyway' do + connection.on(:error) do |error| + expect(connection.state).to eq(:connected) + expect(connection.error_reason.message).to match(/Recover/) + expect(connection.error_reason).to eql(error) + stop_reactor + end + end + end + end + end + + context 'with many connections simultaneously', em_timeout: 15 do + let(:connection_count) { 40 } + let(:connection_ids) { [] } + let(:connection_keys) { [] } + + it 'opens each with a unique connection#id and connection#key' do + connection_count.times.map do + Ably::Realtime::Client.new(client_options) + end.each do |client| + client.connection.on(:connected) do + connection_ids << client.connection.id + connection_keys << client.connection.key + next unless connection_ids.count == connection_count + + expect(connection_ids.uniq.count).to eql(connection_count) + expect(connection_keys.uniq.count).to eql(connection_count) + stop_reactor + end + end + end + end + + context 'when a state transition is unsupported' do + let(:client_options) { default_options.merge(log_level: :none) } # silence FATAL errors + + it 'emits a StateChangeError' do + connection.connect do + connection.transition_state_machine :initialized + end + + connection.on(:error) do |error| + expect(error).to be_a(Ably::Exceptions::StateChangeError) + stop_reactor end end end end end