spec/acceptance/realtime/connection_spec.rb in ably-0.8.5 vs spec/acceptance/realtime/connection_spec.rb in ably-0.8.6

- old
+ new

@@ -82,92 +82,131 @@ end end end context 'that expire' do - let(:client_options) { default_options.merge(log_level: :none) } + let(:client_options) { default_options.merge(log_level: :error) } before do expect(client.rest_client.time.to_f).to be_within(2).of(Time.now.to_i), "Local clock is out of sync with Ably" end before do # Ensure tokens issued expire immediately after issue @original_renew_token_buffer = Ably::Auth::TOKEN_DEFAULTS.fetch(:renew_token_buffer) stub_const 'Ably::Auth::TOKEN_DEFAULTS', Ably::Auth::TOKEN_DEFAULTS.merge(renew_token_buffer: 0) - - # Authorise synchronously to ensure token has been issued - client.auth.authorise_sync(ttl: ttl) end let(:original_renew_token_buffer) { @original_renew_token_buffer } context 'opening a new connection' do - context 'with recently expired token' do + context 'with almost expired tokens' do + before do + # Authorise synchronously to ensure token has been issued + client.auth.authorise_sync(ttl: ttl) + end + let(:ttl) { 2 } - it 'renews the token on connect without changing connection state' do - connection.once(:connecting) do - sleep ttl + 0.1 - expect(client.auth.current_token_details).to be_expired - expect(client.rest_client.auth).to receive(:authorise).at_least(:once).and_call_original - connection.once(:connected) do - expect(client.auth.current_token_details).to_not be_expired + it 'renews token every time after it expires' do + started_at = Time.now.to_f + connected_times = 0 + disconnected_times = 0 + connection.on(:connected) do + connected_times += 1 + end + connection.on(:disconnected) do + disconnected_times += 1 + if disconnected_times == 3 + expect(connected_times).to eql(3) + expect(Time.now.to_f - started_at).to be > ttl * 3 + expect(Time.now.to_f - started_at).to be < (ttl * 2) * 3 stop_reactor end - connection.once_state_changed do - raise "Invalid state #{connection.state}" unless connection.state == :connected - end end end end - context 'with immediately expiring token' do + context 'with immediately expired token' do let(:ttl) { 0.001 } + let(:auth_requests) { [] } + let(:token_callback) do + Proc.new do + auth_requests << Time.now + Ably::Rest::Client.new(default_options).auth.request_token(ttl: ttl).token + end + end + let(:client_options) { default_options.merge(auth_callback: token_callback) } - it 'renews the token on connect, and only makes one subsequent attempt to obtain a new token' do - expect(client.rest_client.auth).to receive(:authorise).at_least(:twice).and_call_original + it 'renews the token on connect, and makes one immediate subsequent attempt to obtain a new token' do + started_at = Time.now.to_f connection.once(:disconnected) do - connection.once(:failed) do |connection_state_change| + connection.once(:disconnected) do |connection_state_change| expect(connection_state_change.reason.code).to eql(40140) # token expired + expect(Time.now.to_f - started_at).to be < 1000 + expect(auth_requests.count).to eql(2) stop_reactor end end end - it 'uses the primary host for subsequent connection and auth requests' do - EventMachine.add_timer(1) do # wait for token to expire + context 'when disconnected_retry_timeout is 0.5 seconds' do + let(:client_options) { default_options.merge(disconnected_retry_timeout: 0.5, auth_callback: token_callback, log_level: :error) } + + it 'renews the token on connect, and continues to attempt renew based on the retry schedule' do + started_at = Time.now.to_f + disconnect_count = 0 + connection.on(:disconnected) do |connection_state_change| + expect(connection_state_change.reason.code).to eql(40140) # token expired + disconnect_count += 1 + if disconnect_count == 6 + expect(Time.now.to_f - started_at).to be > 4 * 0.5 # at least 4 0.5 second pauses should have happened + expect(Time.now.to_f - started_at).to be < 9 # allow 1.5 seconds for each authentication cycle + stop_reactor + end + end + end + end + + context 'using implicit token auth' do + let(:client_options) { default_options.merge(use_token_auth: true, token_params: { ttl: ttl }) } + + before do + stub_const 'Ably::Models::TokenDetails::TOKEN_EXPIRY_BUFFER', -10 # ensure client lib thinks token is still valid + end + + it 'uses the primary host for subsequent connection and auth requests' do connection.once(:disconnected) do expect(client.rest_client.connection).to receive(:post). with(/requestToken$/, anything). - at_least(:once). + exactly(:once). and_call_original expect(client.rest_client).to_not receive(:fallback_connection) expect(client).to_not receive(:fallback_endpoint) - connection.once(:failed) do - connection.off + connection.once(:disconnected) do stop_reactor end end end end end end context 'when connected with a valid non-expired token' do context 'that then expires following the connection being opened' do - let(:ttl) { 5 } - let(:channel) { client.channel(random_str) } + let(:ttl) { 5 } + let(:channel_name) { random_str } + let(:channel) { client.channel(channel_name) } + let(:client_options) { default_options.merge(use_token_auth: true, token_params: { ttl: ttl }) } context 'the server' do it 'disconnects the client, and the client automatically renews the token and then reconnects', em_timeout: 15 do - original_token = client.auth.current_token_details - expect(original_token).to_not be_expired - connection.once(:connected) do + original_token = client.auth.current_token_details + expect(original_token).to_not be_expired started_at = Time.now connection.once(:disconnected) do |connection_state_change| expect(connection_state_change.reason.code).to eq(40140) # Token expired # Token has expired, so now ensure it is not used again @@ -187,12 +226,90 @@ channel.attach end end - skip 'retains connection state' - skip 'changes state to failed if a new token cannot be issued' + context 'connection state' do + let(:ttl) { 4 } + let(:auth_requests) { [] } + let(:token_callback) do + Proc.new do + sleep 2 + auth_requests << Time.now + Ably::Rest::Client.new(default_options).auth.request_token(ttl: ttl).token + end + end + let(:client_options) { default_options.merge(auth_callback: token_callback) } + let(:publishing_client) { auto_close Ably::Realtime::Client.new(default_options) } + let(:publishing_channel) { publishing_client.channels.get(channel_name) } + let(:messages_received) { [] } + + def publish_and_check_first_disconnect + 10.times.each { |index| publishing_channel.publish('event', index.to_s) } + channel.subscribe('event') do |message| + messages_received << message.data.to_i + if messages_received.count == 10 + expect(messages_received).to match(10.times) + expect(auth_requests.count).to eql(2) + EventMachine.add_timer(1) do + channel.unsubscribe 'event' + yield + end + end + end + end + + def publish_and_check_second_disconnect + 10.times.each { |index| publishing_channel.publish('event', (index + 10).to_s) } + channel.subscribe('event') do |message| + messages_received << message.data.to_i + if messages_received.count == 20 + expect(messages_received).to match(20.times) + expect(auth_requests.count).to eql(3) + stop_reactor + end + end + end + + it 'retains messages published when disconnected twice during authentication', em_timeout: 20 do + publishing_channel.attach do + channel.attach do + connection.once(:disconnected) do + publish_and_check_first_disconnect do + connection.once(:disconnected) do + publish_and_check_second_disconnect + end + end + end + end + end + end + end + + context 'and subsequent token is invalid' do + let(:ttl) { 2 } + let(:token_callback) do + Proc.new do + if @token_issued + "#{app_id}.invalid-token-invalid-token-invalid-token" + else + @token_issued = true + Ably::Rest::Client.new(default_options).auth.request_token(ttl: ttl).token + end + end + end + let(:client_options) { default_options.merge(auth_callback: token_callback, log_level: :none) } + + it 'transitions the connection to the failed state' do + connection.once(:disconnected) do + connection.once(:failed) do + expect(connection.error_reason.code).to eql(40101) + stop_reactor + end + end + end + end end end end end @@ -226,10 +343,54 @@ context 'when connected' do skip 'transitions state to failed' end end end + + context 'with opaque token string that contain an implicit client_id' do + let(:client_options) { default_options.merge(token: token_string, key: nil) } + let(:rest_auth_client) { Ably::Rest::Client.new(default_options.merge(key: api_key)) } + let(:token_string) { rest_auth_client.auth.request_token(client_id: client_id).token } + + context 'string' do + let(:client_id) { random_str } + + it 'sets the Client#client_id and Auth#client_id once CONNECTED' do + expect(client.client_id).to be_nil + client.connection.once(:connected) do + expect(client.client_id).to eql(client_id) + stop_reactor + end + end + + context 'that is incompatible with the current client client_id' do + let(:client_id) { random_str } + let(:client_options) { default_options.merge(client_id: 'incompatible', token: token_string, key: nil, log_level: :none) } + + it 'fails the connection' do + expect(client.client_id).to eql('incompatible') + client.connection.once(:failed) do + expect(client.client_id).to eql('incompatible') + expect(client.connection.error_reason.code).to eql(40101) # Invalid clientId for credentials + stop_reactor + end + end + end + end + + context 'wildcard' do + let(:client_id) { '*' } + + it 'configures the Client#client_id and Auth#client_id with a wildcard once CONNECTED' do + expect(client.client_id).to be_nil + client.connection.once(:connected) do + expect(client.client_id).to eql('*') + stop_reactor + end + end + end + end end end context 'initialization state changes' do let(:phases) { [:connecting, :connected] } @@ -275,10 +436,54 @@ expect(connection.state).to eq(:connected) stop_reactor end end + it 'calls the provided block on success even if state changes to disconnected first' do + been_disconnected = false + + connection.once(:disconnected) do + been_disconnected = true + end + connection.once(:connecting) do + close_if_transport_available = proc do + EventMachine.add_timer(0.001) do + if connection.transport + connection.transport.close_connection_after_writing + else + close_if_transport_available.call + end + end + end + close_if_transport_available.call + end + + connection.connect do + expect(connection.state).to eq(:connected) + expect(been_disconnected).to be_truthy + stop_reactor + end + end + + context 'with invalid auth details' do + let(:client_options) { default_options.merge(key: 'this.is:invalid', log_level: :none) } + + it 'calls the Deferrable errback only once on connection failure' do + errback_called = false + connection.connect.errback do + expect(connection.state).to eq(:failed) + + raise 'Errback already called' if errback_called + errback_called = true + + connection.connect.errback do + EventMachine.add_timer(0.5) { stop_reactor } + end + end + end + end + context 'when already connected' do it 'does nothing and no further state changes are emitted' do connection.once(:connected) do connection.once_state_changed { raise 'State should not have changed' } 3.times { connection.connect } @@ -507,16 +712,14 @@ connection.close end end context 'with an unresponsive connection' do - let(:stubbed_timeout) { 2 } + let(:custom_timeout) { 2 } + let(:client_options) { default_options.merge(realtime_request_timeout: custom_timeout) } before do - stub_const 'Ably::Realtime::Connection::ConnectionManager::TIMEOUTS', - Ably::Realtime::Connection::ConnectionManager::TIMEOUTS.merge(close: stubbed_timeout) - connection.on(:connected) do # Prevent all incoming & outgoing ProtocolMessages from being processed by the client library connection.__outgoing_protocol_msgbus__.unsubscribe connection.__incoming_protocol_msgbus__.unsubscribe end @@ -525,11 +728,11 @@ it 'force closes the connection when a :closed ProtocolMessage response is not received' do connection.on(:connected) do close_requested_at = Time.now connection.on(:closed) do - expect(Time.now - close_requested_at).to be >= stubbed_timeout + expect(Time.now - close_requested_at).to be >= custom_timeout expect(connection.state).to eq(:closed) expect(events[:error_emitted]).to_not eql(true) expect(events[:closed_message_from_server_received]).to_not eql(true) expect(events[:closing_emitted]).to eql(true) stop_reactor @@ -569,37 +772,59 @@ end connection.ping { raise 'Forced exception' } end end end + + context 'when ping times out' do + let(:client_options) { default_options.merge(log_level: :error) } + + it 'logs a warning' do + connection.once(:connected) do + allow(connection).to receive(:defaults).and_return(connection.defaults.merge(realtime_request_timeout: 0.0001)) + expect(connection.logger).to receive(:warn).with(/Ping timed out/) do + stop_reactor + end + connection.ping + end + end + + it 'yields to the block with a nil value' do + connection.once(:connected) do + allow(connection).to receive(:defaults).and_return(connection.defaults.merge(realtime_request_timeout: 0.0001)) + connection.ping do |time_elapsed| + expect(time_elapsed).to be_nil + stop_reactor + end + end + end + end end context 'recovery' do let(:channel_name) { random_str } let(:channel) { client.channel(channel_name) } let(:publishing_client) do auto_close Ably::Realtime::Client.new(client_options) end let(:publishing_client_channel) { publishing_client.channel(channel_name) } - let(:client_options) { default_options.merge(log_level: :fatal) } - before do - # Reconfigure client library retry periods and timeouts so that tests run quickly - stub_const 'Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG', - Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG.merge( - disconnected: { retry_every: 0.1, max_time_in_state: 0.2 }, - suspended: { retry_every: 0.1, max_time_in_state: 0.2 }, - ) + let(:client_options) do + default_options.merge( + log_level: :none, + disconnected_retry_timeout: 0.1, + suspended_retry_timeout: 0.1, + connection_state_ttl: 0.2 + ) end describe '#recovery_key' do def self.available_states [:connecting, :connected, :disconnected, :suspended, :failed] end let(:available_states) { self.class.available_states } let(:states) { Hash.new } - let(:client_options) { default_options.merge(log_level: :none) } let(:channel) { client.channel(random_str) } it 'is composed of connection key and serial that is kept up to date with each message ACK received' do connection.on(:connected) do expected_serial = -1 @@ -639,10 +864,15 @@ connection.on(state) do states[state.to_sym] = true if connection.recovery_key end end + connection.once(:suspended) do + error_message = Ably::Models::ProtocolMessage.new(action: 9, error: { message: 'force failure' }) + connection.__incoming_protocol_msgbus__.publish :protocol_message, error_message + end + connection.once(:failed) do expect(states.keys).to match_array(available_states) stop_reactor end end @@ -657,12 +887,10 @@ end end context "opening a new connection using a recently disconnected connection's #recovery_key" do context 'connection#id and connection#key after recovery' do - let(:client_options) { default_options.merge(log_level: :none) } - it 'remains the same' do previous_connection_id = nil previous_connection_key = nil connection.once(:connected) do @@ -699,12 +927,10 @@ end end context 'when messages have been sent whilst the old connection is disconnected' do describe 'the new connection' do - let(:client_options) { default_options.merge(log_level: :none) } - it 'recovers server-side queued messages' do channel.attach do connection.transition_state_machine! :failed end @@ -772,11 +998,11 @@ let(:connection_ids) { [] } let(:connection_keys) { [] } it 'opens each with a unique connection#id and connection#key' do connection_count.times.map do - Ably::Realtime::Client.new(client_options) + auto_close Ably::Realtime::Client.new(client_options) end.each do |client| client.connection.on(:connected) do connection_ids << client.connection.id connection_keys << client.connection.key next unless connection_ids.count == connection_count @@ -941,18 +1167,16 @@ end end end context 'when connection enters the :suspended state' do - let(:client_options) { default_options.merge(:log_level => :fatal) } - - before do - # Reconfigure client library retry periods so that client stays in suspended state - stub_const 'Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG', - Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG.merge( - disconnected: { retry_every: 0.01, max_time_in_state: 0.05 }, - suspended: { retry_every: 60, max_time_in_state: 60 } - ) + let(:client_options) do + default_options.merge( + log_level: :fatal, + disconnected_retry_timeout: 0.02, + suspended_retry_timeout: 60, + connection_state_ttl: 0.05 + ) end it 'detaches the channels and prevents publishing of messages on those channels' do channel.attach do channel.once(:detached) do