# encoding: utf-8 require 'spec_helper' describe Ably::Realtime::Connection, 'failures', :event_machine do let(:connection) { client.connection } vary_by_protocol do let(:default_options) do { key: api_key, environment: environment, protocol: protocol } end let(:client_options) { default_options } let(:client) do auto_close Ably::Realtime::Client.new(client_options) end let(:rest_client) do Ably::Rest::Client.new(default_options) end context 'authentication failure' do let(:client_options) do default_options.merge(key: invalid_key, log_level: :none) end context 'when API key is invalid' do context 'with invalid app part of the key' do let(:invalid_key) { 'not_an_app.invalid_key_name:invalid_key_value' } it 'enters the failed state and returns a not found error' do connection.on(:failed) do |connection_state_change| error = connection_state_change.reason expect(connection.state).to eq(:failed) # TODO: Check error type is an InvalidToken exception expect(error.status).to eq(404) expect(error.code).to eq(40400) # not found stop_reactor end end end context 'with invalid key name part of the key' do let(:invalid_key) { "#{app_id}.invalid_key_name:invalid_key_value" } it 'enters the failed state and returns an authorization error' do connection.on(:failed) do |connection_state_change| error = connection_state_change.reason expect(connection.state).to eq(:failed) # TODO: Check error type is a TokenNotFound exception expect(error.status).to eq(401) expect(error.code).to eq(40400) # not found stop_reactor end end end end context 'with auth_url' do context 'opening a new connection' do context 'request fails due to network failure' do let(:client_options) { default_options.reject { |k, v| k == :key }.merge(auth_url: "http://#{random_str}.domain.will.never.resolve.to/path", log_level: :fatal) } specify 'the connection moves to the disconnected state and tries again, returning again to the disconnected state (#RSA4c, #RSA4c1, #RSA4c2)' do states = Hash.new { |hash, key| hash[key] = [] } connection.once(:connected) { raise "Connection can never move to connected because of auth failures" } connection.on do |connection_state| states[connection_state.current.to_sym] << Time.now if states[:disconnected].count == 2 && connection_state.current == :disconnected expect(connection.error_reason).to be_a(Ably::Exceptions::ConnectionError) expect(connection.error_reason.message).to match(/auth_url/) EventMachine.add_timer(2) do expect(states.keys).to include(:connecting, :disconnected) expect(states[:connecting].count).to eql(2) expect(states[:connected].count).to eql(0) stop_reactor end end end end end context 'request fails due to invalid content', :webmock do let(:auth_endpoint) { "http://#{random_str}.domain.will.never.resolve.to/authenticate" } let(:client_options) { default_options.reject { |k, v| k == :key }.merge(auth_url: auth_endpoint, log_level: :fatal) } before do stub_request(:get, auth_endpoint). to_return(:status => 200, :body => "", :headers => { "Content-type" => "text/html" }) end specify 'the connection moves to the disconnected state and tries again, returning again to the disconnected state (#RSA4c, #RSA4c1, #RSA4c2)' do states = Hash.new { |hash, key| hash[key] = [] } connection.once(:connected) { raise "Connection can never move to connected because of auth failures" } connection.on do |connection_state| states[connection_state.current.to_sym] << Time.now if states[:disconnected].count == 2 && connection_state.current == :disconnected expect(connection.error_reason).to be_a(Ably::Exceptions::ConnectionError) expect(connection.error_reason.message).to match(/auth_url/) expect(connection.error_reason.message).to match(/Content Type.*not supported/) EventMachine.add_timer(2) do expect(states.keys).to include(:connecting, :disconnected) expect(states[:connecting].count).to eql(2) expect(states[:connected].count).to eql(0) stop_reactor end end end end end context 'request fails due to slow response and subsequent timeout', :webmock, em_timeout: (Ably::Rest::Client::HTTP_DEFAULTS.fetch(:request_timeout) + 5) * 2 do let(:auth_url) { "http://#{random_str}.domain.will.be.stubbed/path" } let(:client_options) { default_options.reject { |k, v| k == :key }.merge(auth_url: auth_url, log_level: :fatal) } # Timeout +5 seconds, beyond default allowed timeout before do stub_request(:get, auth_url). to_return do |request| sleep Ably::Rest::Client::HTTP_DEFAULTS.fetch(:request_timeout) + 5 { status: [500, "Internal Server Error"] } end end specify 'the connection moves to the disconnected state and tries again, returning again to the disconnected state (#RSA4c, #RSA4c1, #RSA4c2)' do states = Hash.new { |hash, key| hash[key] = [] } connection.once(:connected) { raise "Connection can never move to connected because of auth failures" } connection.on do |connection_state| states[connection_state.current.to_sym] << Time.now if states[:disconnected].count == 2 && connection_state.current == :disconnected expect(connection.error_reason).to be_a(Ably::Exceptions::ConnectionError) expect(connection.error_reason.message).to match(/auth_url/) EventMachine.add_timer(2) do expect(states.keys).to include(:connecting, :disconnected) expect(states[:connecting].count).to eql(2) expect(states[:connected].count).to eql(0) stop_reactor end end end end end context 'request fails once due to slow response but succeeds the second time' do let(:auth_url) { "http://#{random_str}.domain.will.be.stubbed/path" } let(:client_options) { default_options.reject { |k, v| k == :key }.merge(auth_url: auth_url, log_level: :fatal) } # Timeout +5 seconds, beyond default allowed timeout before do token_response = Ably::Rest::Client.new(default_options).auth.request_token WebMock.enable! stub_request(:get, auth_url). to_return do |request| sleep Ably::Rest::Client::HTTP_DEFAULTS.fetch(:request_timeout) { status: [500, "Internal Server Error"] } end.then. to_return(:status => 201, :body => token_response.to_json, :headers => { 'Content-Type' => 'application/json' }) end specify 'the connection moves to the disconnected state and tries again, returning again to the disconnected state (#RSA4c, #RSA4c1, #RSA4c2)' do states = Hash.new { |hash, key| hash[key] = [] } connection.once(:connected) do expect(states[:disconnected].count).to eql(1) expect(states[:connecting].count).to eql(2) stop_reactor end connection.on do |connection_state| states[connection_state.current.to_sym] << Time.now end end end end context 'existing CONNECTED connection' do context 'authorize request failure leaves connection in existing condition' do let(:auth_options) { { auth_url: "http://#{random_str}.domain.will.never.resolve.to/path" } } let(:client_options) { default_options.merge(use_token_auth: true, log_level: :fatal) } specify 'the connection remains in the CONNECTED state and authorize fails (#RSA4c, #RSA4c1, #RSA4c3)' do connection.once(:connected) do connection.on { raise "State should not change and should stay connected" } client.auth.authorize(nil, auth_options).tap do |deferrable| deferrable.callback { raise "Authorize should not succeed" } deferrable.errback do |err| expect(err).to be_a(Ably::Exceptions::ConnectionError) expect(err.message).to match(/auth_url/) EventMachine.add_timer(1) do expect(connection).to be_connected connection.off stop_reactor end end end end end end end end context 'with auth_callback' do context 'opening a new connection' do context 'when callback fails due to an exception' do let(:client_options) { default_options.reject { |k, v| k == :key }.merge(auth_callback: lambda { |token_params| raise "Cannot issue token" }, log_level: :fatal) } it 'the connection moves to the disconnected state and tries again, returning again to the disconnected state (#RSA4c, #RSA4c1, #RSA4c2)' do states = Hash.new { |hash, key| hash[key] = [] } connection.once(:connected) { raise "Connection can never move to connected because of auth failures" } connection.on do |connection_state| states[connection_state.current.to_sym] << Time.now if states[:disconnected].count == 2 && connection_state.current == :disconnected expect(connection.error_reason).to be_a(Ably::Exceptions::ConnectionError) expect(connection.error_reason.message).to match(/auth_callback/) EventMachine.add_timer(2) do expect(states.keys).to include(:connecting, :disconnected) expect(states[:connecting].count).to eql(2) expect(states[:connected].count).to eql(0) stop_reactor end end end end end context 'existing CONNECTED connection' do context 'when callback fails due to the request taking longer than realtime_request_timeout' do let(:request_timeout) { 3 } let(:client_options) { default_options.merge( realtime_request_timeout: request_timeout, use_token_auth: true, log_level: :fatal) } let(:auth_options) { { auth_callback: lambda { |token_params| sleep 10 }, } } it 'the authorization request fails as configured in the realtime_request_timeout (#RSA4c, #RSA4c1, #RSA4c3)' do connection.once(:connected) do connection.on { raise "State should not change and should stay connected" } client.auth.authorize(nil, auth_options).tap do |deferrable| deferrable.callback { raise "Authorize should not succeed" } deferrable.errback do |err| expect(err).to be_a(Ably::Exceptions::ConnectionError) expect(err.message).to match(/auth_callback/) EventMachine.add_timer(1) do expect(connection).to be_connected connection.off stop_reactor end end end end end end end end end end context 'automatic connection retry' do context 'with invalid WebSocket host' do let(:retry_every_for_tests) { 0.2 } let(:max_time_in_state_for_tests) { 0.6 } let(:client_failure_options) do default_options.merge( log_level: :none, disconnected_retry_timeout: retry_every_for_tests, suspended_retry_timeout: retry_every_for_tests, max_connection_state_ttl: max_time_in_state_for_tests ) end # retry immediately after failure, then one retry every :retry_every_for_tests let(:expected_retry_attempts) { 1 + (max_time_in_state_for_tests / retry_every_for_tests).round } let(:state_changes) { Hash.new { |hash, key| hash[key] = 0 } } let(:timer) { Hash.new } let(:client_options) do client_failure_options.merge(realtime_host: 'non.existent.host') end def count_state_changes EventMachine.next_tick do %w(connecting disconnected failed suspended).each do |state| connection.on(state.to_sym) { state_changes[state.to_sym] += 1 } end end end def start_timer timer[:start] = Time.now end def time_passed Time.now.to_f - timer[:start].to_f end context 'when disconnected' do it 'enters the suspended state after multiple attempts to connect' do connection.on(:failed) { raise 'Connection should not have reached :failed state yet' } count_state_changes && start_timer connection.once(:suspended) do expect(connection.state).to eq(:suspended) expect(state_changes[:connecting]).to eql(expected_retry_attempts + 1) # allow for initial connecting attempt expect(state_changes[:disconnected]).to eql(expected_retry_attempts) expect(time_passed).to be > max_time_in_state_for_tests stop_reactor end end context 'for the first time' do let(:client_options) do default_options.merge(realtime_host: 'non.existent.host', disconnected_retry_timeout: 2, log_level: :error) end it 'reattempts connection immediately and then waits disconnected_retry_timeout for a subsequent attempt' do expect(connection.defaults[:disconnected_retry_timeout]).to eql(2) connection.once(:disconnected) do started_at = Time.now.to_f connection.once(:disconnected) do expect(Time.now.to_f - started_at).to be < 1 started_at = Time.now.to_f connection.once(:disconnected) do expect(Time.now.to_f - started_at).to be > 2 stop_reactor end end end end end describe '#close' do it 'transitions connection state to :closed' do connection.on(:connected) { raise 'Connection should not have reached :connected state' } connection.on(:failed) { raise 'Connection should not have reached :failed state yet' } connection.once(:disconnected) do expect(connection.state).to eq(:disconnected) connection.on(:closed) do expect(connection.state).to eq(:closed) stop_reactor end connection.close end end end end context 'when connection state is :suspended' do it 'stays in the suspended state after any number of reconnection attempts' do connection.on(:connected) { raise 'Connection should not have reached :connected state' } connection.once(:suspended) do count_state_changes && start_timer EventMachine.add_timer((retry_every_for_tests + 0.1) * 10) do expect(connection.state).to eq(:suspended) expect(state_changes[:connecting]).to be >= 10 expect(state_changes[:suspended]).to be >= 10 expect(state_changes[:disconnected]).to eql(0) stop_reactor end end end context 'for the first time' do let(:client_options) do default_options.merge(suspended_retry_timeout: 2, max_connection_state_ttl: 0, log_level: :error) end it 'waits suspended_retry_timeout before attempting to reconnect' do expect(client.connection.defaults[:suspended_retry_timeout]).to eql(2) connection.once(:connected) do connection.transition_state_machine :suspended allow(connection).to receive(:current_host).and_return('does.not.exist.com') started_at = Time.now.to_f connection.once(:connecting) do expect(Time.now.to_f - started_at).to be > 1.75 started_at = Time.now.to_f connection.once(:connecting) do expect(Time.now.to_f - started_at).to be > 1.75 connection.once(:suspended) do stop_reactor end end end end end end describe '#close' do it 'transitions connection state to :closed' do connection.on(:connected) { raise 'Connection should not have reached :connected state' } connection.once(:suspended) do expect(connection.state).to eq(:suspended) connection.on(:closed) do expect(connection.state).to eq(:closed) stop_reactor end connection.close end end end end context 'when connection state is :failed' do describe '#close' do it 'will not transition state to :close and fails with an InvalidStateChange exception' do connection.on(:connected) { raise 'Connection should not have reached :connected state' } connection.once(:suspended) do connection.transition_state_machine :failed end connection.once(:failed) do expect(connection.state).to eq(:failed) connection.close.errback do |error| expect(error).to be_a(Ably::Exceptions::InvalidStateChange) expect(error.message).to match(/Unable to transition from failed => closing/) stop_reactor end end end end end context '#error_reason' do [:disconnected, :suspended, :failed].each do |state| it "contains the error when state is #{state}" do connection.on(state) do |connection_state_change| error = connection_state_change.reason expect(connection.error_reason).to eq(error) expect(connection.error_reason.code).to eql(80000) stop_reactor end connection.once(:suspended) do |connection_state_change| connection.transition_state_machine :failed, reason: connection_state_change.reason end end end it 'is reset to nil when :connected' do connection.once(:disconnected) do |error| # stub the host so that the connection connects allow(connection).to receive(:determine_host).and_yield(TestApp.instance.realtime_host) connection.once(:connected) do expect(connection.error_reason).to be_nil stop_reactor end end end it 'is reset to nil when :closed' do connection.once(:disconnected) do |error| connection.close do expect(connection.error_reason).to be_nil stop_reactor end end end end end describe '#connect' do let(:timeout) { 1.5 } let(:client_options) do default_options.merge( log_level: :none, realtime_request_timeout: timeout, ) end before do connection.on(:connected) { raise "Connection should not open in this test as CONNECTED ProtocolMessage is never received" } connection.once(:connecting) do # don't process any incoming ProtocolMessages so the connection never opens connection.__incoming_protocol_msgbus__.unsubscribe end end context 'connection opening times out' do it 'attempts to reconnect' do started_at = Time.now connection.once(:disconnected) do expect(Time.now.to_f - started_at.to_f).to be > timeout connection.once(:connecting) do stop_reactor end end connection.connect end context 'when retry intervals are stubbed to attempt reconnection quickly' do let(:client_options) do default_options.merge( log_level: :error, disconnected_retry_timeout: 0.1, suspended_retry_timeout: 0.1, max_connection_state_ttl: 0.2, realtime_host: 'non.existent.host' ) end it 'never calls the provided success block', em_timeout: 10 do connection.connect do raise 'success block should not have been called' end connection.once(:suspended) do connection.once(:suspended) do stop_reactor end end end end end end end context 'connection resume' do let(:channel_name) { random_str } let(:channel) { client.channel(channel_name) } let(:publishing_client) do auto_close Ably::Realtime::Client.new(client_options) end let(:publishing_client_channel) { publishing_client.channel(channel_name) } let(:client_options) { default_options.merge(log_level: :none) } def fail_if_suspended_or_failed connection.on(:suspended) { raise 'Connection should not have reached :suspended state' } connection.on(:failed) { raise 'Connection should not have reached :failed state' } end context 'when DISCONNECTED ProtocolMessage received from the server' do it 'reconnects automatically and immediately' do fail_if_suspended_or_failed connection.once(:connected) do connection.once(:disconnected) do disconnected_at = Time.now.to_f connection.once(:connecting) do expect(Time.now.to_f).to be_within(0.25).of(disconnected_at) connection.once(:connected) do state_history = connection.state_history.map { |transition| transition[:state].to_sym } expect(state_history).to eql([:connecting, :connected, :disconnected, :connecting, :connected]) stop_reactor end end end protocol_message = Ably::Models::ProtocolMessage.new(action: Ably::Models::ProtocolMessage::ACTION.Disconnected.to_i) connection.__incoming_protocol_msgbus__.publish :protocol_message, protocol_message end end context 'connection state freshness is monitored' do it 'resumes connections when disconnected within the connection_state_ttl period (#RTN15g)' do connection.once(:connected) do connection_id = connection.id reconnected_with_resume = false # Make sure the next connect has the resume param allow(EventMachine).to receive(:connect).and_wrap_original do |original, *args, &block| url = args[4] uri = URI.parse(url) expect(CGI::parse(uri.query)['resume'][0]).to_not be_empty reconnected_with_resume = true original.call(*args, &block) end connection.once(:disconnected) do disconnected_at = Time.now connection.once(:connecting) do expect(Time.now.to_f - disconnected_at.to_f).to be < connection.connection_state_ttl connection.once(:connected) do |state_change| expect(connection.id).to eql(connection_id) expect(reconnected_with_resume).to be_truthy stop_reactor end end end connection.transport.unbind end end context 'when connection_state_ttl period has passed since being disconnected' do let(:client_options) do default_options.merge( disconnected_retry_timeout: 4, suspended_retry_timeout: 8, max_connection_state_ttl: 2, ) end it 'clears the local connection state and uses a new connection when the connection_state_ttl period has passed (#RTN15g)' do connection.once(:connected) do connection_id = connection.id resumed_with_clean_connection = false connection.once(:disconnected) do disconnected_at = Time.now connection.once(:connecting) do connection.once(:disconnected) do # Make sure the next connect does not have the resume param allow(EventMachine).to receive(:connect).and_wrap_original do |original, *args, &block| url = args[4] uri = URI.parse(url) expect(CGI::parse(uri.query)['resume']).to be_empty resumed_with_clean_connection = true original.call(*args, &block) end allow(connection.details).to receive(:max_idle_interval).and_return(0) connection.__incoming_protocol_msgbus__.plugin_listeners connection.once(:connecting) do expect(Time.now.to_f - disconnected_at.to_f).to be > connection.connection_state_ttl connection.once(:connected) do |state_change| expect(connection.id).to_not eql(connection_id) expect(resumed_with_clean_connection).to be_truthy stop_reactor end end end # Disconnect the transport and trigger a new disconnected state wait_until(lambda { connection.transport }) do connection.transport.unbind end end connection.__incoming_protocol_msgbus__.unplug_listeners end connection.transport.unbind end end end context 'when connection_state_ttl period has passed since last activity on the connection' do let(:client_options) do default_options.merge( max_connection_state_ttl: 2, ) end it 'does not clear the local connection state when the connection_state_ttl period has passed since last activity, but the idle timeout has not passed (#RTN15g1, #RTN15g2)' do expect(connection.connection_state_ttl).to eql(client_options.fetch(:max_connection_state_ttl)) connection.once(:connected) do connection_id = connection.id resumed_connection = false connection.once(:disconnected) do disconnected_at = Time.now allow(connection).to receive(:time_since_connection_confirmed_alive?).and_return(connection.connection_state_ttl + 1) # Make sure the next connect does not have the resume param allow(EventMachine).to receive(:connect).and_wrap_original do |original, *args, &block| url = args[4] uri = URI.parse(url) expect(CGI::parse(uri.query)['resume']).to_not be_empty resumed_connection = true original.call(*args, &block) end connection.once(:connecting) do connection.once(:connected) do |state_change| expect(connection.id).to eql(connection_id) expect(resumed_connection).to be_truthy stop_reactor end end end connection.transport.unbind end end it 'clears the local connection state and uses a new connection when the connection_state_ttl + max_idle_interval period has passed since last activity (#RTN15g1, #RTN15g2)' do expect(connection.connection_state_ttl).to eql(client_options.fetch(:max_connection_state_ttl)) connection.once(:connected) do connection_id = connection.id resumed_with_clean_connection = false connection.once(:disconnected) do disconnected_at = Time.now pseudo_time_passed = connection.connection_state_ttl + connection.details.max_idle_interval + 1 allow(connection).to receive(:time_since_connection_confirmed_alive?).and_return(pseudo_time_passed) # Make sure the next connect does not have the resume param allow(EventMachine).to receive(:connect).and_wrap_original do |original, *args, &block| url = args[4] uri = URI.parse(url) expect(CGI::parse(uri.query)['resume']).to be_empty resumed_with_clean_connection = true original.call(*args, &block) end connection.once(:connecting) do connection.once(:connected) do |state_change| expect(connection.id).to_not eql(connection_id) expect(resumed_with_clean_connection).to be_truthy stop_reactor end end end connection.transport.unbind end end it 'still reattaches the channels automatically following a new connection being established (#RTN15g2)' do connection.once(:connected) do connection_id = connection.id resumed_with_clean_connection = false channel_emitted_an_attached = false channel.attach do channel.once(:attached) do |channel_state_change| expect(channel_state_change.resumed).to be_falsey channel_emitted_an_attached = true end connection.once(:disconnected) do disconnected_at = Time.now pseudo_time_passed = connection.connection_state_ttl + connection.details.max_idle_interval + 1 allow(connection).to receive(:time_since_connection_confirmed_alive?).and_return(pseudo_time_passed) # Make sure the next connect does not have the resume param allow(EventMachine).to receive(:connect).and_wrap_original do |original, *args, &block| url = args[4] uri = URI.parse(url) expect(CGI::parse(uri.query)['resume']).to be_empty resumed_with_clean_connection = true original.call(*args, &block) end connection.once(:connecting) do connection.once(:connected) do |state_change| expect(connection.id).to_not eql(connection_id) expect(resumed_with_clean_connection).to be_truthy wait_until(lambda { channel.attached? }) do expect(channel_emitted_an_attached).to be_truthy stop_reactor end end end end connection.transport.unbind end end end end end context 'and subsequently fails to reconnect' do let(:retry_every) { 1.5 } let(:client_options) do default_options.merge( log_level: :none, disconnected_retry_timeout: retry_every, suspended_retry_timeout: retry_every, max_connection_state_ttl: 60 ) end it "retries every #{Ably::Realtime::Connection::DEFAULTS.fetch(:disconnected_retry_timeout)} seconds" do fail_if_suspended_or_failed stubbed_first_attempt = false connection.once(:connected) do connection.once(:disconnected) do connection.once(:connecting) do connection.once(:disconnected) do disconnected_at = Time.now.to_f connection.once(:connecting) do expect(Time.now.to_f - disconnected_at).to be > retry_every state_history = connection.state_history.map { |transition| transition[:state].to_sym } expect(state_history).to eql([:connecting, :connected, :disconnected, :connecting, :disconnected, :connecting]) # allow one more recoonect when reactor stopped expect(connection.manager).to receive(:reconnect_transport) stop_reactor end end # When reconnect called simply open the transport and close immediately expect(connection.manager).to receive(:reconnect_transport) do next if stubbed_first_attempt connection.manager.setup_transport do EventMachine.next_tick do connection.transport.unbind stubbed_first_attempt = true end end end end end protocol_message = Ably::Models::ProtocolMessage.new(action: Ably::Models::ProtocolMessage::ACTION.Disconnected.to_i) connection.__incoming_protocol_msgbus__.publish :protocol_message, protocol_message end end end end context 'when websocket transport is abruptly disconnected' do it 'reconnects automatically' do fail_if_suspended_or_failed connection.once(:connected) do connection.once(:disconnected) do connection.once(:connected) do state_history = connection.state_history.map { |transition| transition[:state].to_sym } expect(state_history).to eql([:connecting, :connected, :disconnected, :connecting, :connected]) stop_reactor end end connection.transport.close_connection_after_writing end end context 'hosts used' do it 'reconnects with the default host' do fail_if_suspended_or_failed connection.once(:connected) do connection.once(:disconnected) do hosts = [] expect(connection).to receive(:create_transport).once.and_wrap_original do |original_method, *args, &block| hosts << args[0] original_method.call(*args, &block) end connection.once(:connected) do host = "#{"#{environment}-" if environment && environment.to_s != 'production'}#{Ably::Realtime::Client::DOMAIN}" expect(hosts.first).to eql(host) expect(hosts.length).to eql(1) stop_reactor end end connection.transport.close_connection_after_writing end end end end context 'after successfully reconnecting and resuming' do it 'retains connection_id and updates the connection_key (#RTN15e, #RTN16d)' do connection.once(:connected) do previous_connection_id = connection.id connection.transport.close_connection_after_writing expect(connection).to receive(:configure_new).with(previous_connection_id, anything, anything).and_call_original connection.once(:connected) do expect(connection.key).to_not be_nil expect(connection.id).to eql(previous_connection_id) stop_reactor end end end it 'includes the error received in the connection state change from Ably but leaves the channels attached' do channel.attach do connection.transport.close_connection_after_writing connection.once(:connecting) do connection.__incoming_protocol_msgbus__.unsubscribe connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| allow(protocol_message).to receive(:error).and_return(Ably::Exceptions::Standard.new('Injected error')) end # Create a new message dispatcher that subscribes to ProtocolMessages after the previous subscription allowing us # to modify the ProtocolMessage Ably::Realtime::Client::IncomingMessageDispatcher.new(client, connection) end connection.once(:connected) do |connection_state_change| EM.add_timer(0.5) do expect(connection_state_change.reason).to be_a(Ably::Exceptions::Standard) expect(connection_state_change.reason.message).to match(/Injected error/) expect(connection.error_reason).to be_a(Ably::Exceptions::Standard) expect(channel).to be_attached stop_reactor end end end end it 'retains channel subscription state' do channel.subscribe('event') do |message| expect(message.data).to eql('message') stop_reactor end channel.attach do publishing_client_channel.attach do connection.transport.close_connection_after_writing connection.once(:connected) do publishing_client_channel.publish 'event', 'message' end end end end it 'executes the resume callback', api_private: true do channel.attach do connection.transport.close_connection_after_writing connection.on_resume do expect(connection).to be_connected stop_reactor end end end context 'when messages were published whilst the client was disconnected' do it 'receives the messages published whilst offline' do messages_received = false channel.subscribe('event') do |message| expect(message.data).to eql('message') messages_received = true end channel.attach do publishing_client_channel.attach do connection.transport.unsafe_off # remove all event handlers that detect socket connection state has changed connection.transport.close_connection_after_writing publishing_client_channel.publish('event', 'message') do EventMachine.add_timer(1) do expect(messages_received).to eql(false) # simulate connection dropped to re-establish web socket connection.transition_state_machine :disconnected end end # subsequent connection will receive message sent whilst disconnected connection.once(:connected) do EventMachine.add_timer(1) do expect(messages_received).to eql(true) stop_reactor end end end end end end it 'retains the client_msg_serial (#RTN15c2, #RTN15c3)' do last_message = nil channel = client.channels.get("foo") channel.attach do connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :message last_message = protocol_message end end channel.publish("first") do expect(last_message.message_serial).to eql(0) channel.publish("second") do expect(last_message.message_serial).to eql(1) connection.once(:connected) do channel.publish("first on resumed connection") do # Message serial reset after failed resume expect(last_message.message_serial).to eql(2) stop_reactor end end # simulate connection dropped to re-establish web socket connection.transition_state_machine :disconnected end end end end end context 'when failing to resume' do context 'because the connection_key is not or no longer valid' do let(:channel) { client.channel(random_str) } def kill_connection_transport_and_prevent_valid_resume connection.transport.close_connection_after_writing connection.configure_new '0123456789abcdef', 'wVIsgTHAB1UvXh7z-1991d8586', -1 # force the resume connection key to be invalid end it 'updates the connection_id and connection_key' do connection.once(:connected) do previous_connection_id = connection.id previous_connection_key = connection.key connection.once(:connected) do expect(connection.key).to_not eql(previous_connection_key) expect(connection.id).to_not eql(previous_connection_id) stop_reactor end kill_connection_transport_and_prevent_valid_resume end end it 'issue a reattach for all attached channels and fail all message awaiting an ACK (#RTN15c3)' do channel_count = 10 channels = channel_count.times.map { |index| client.channel("channel-#{index}") } when_all(*channels.map(&:attach)) do attached_channels = [] reattaching_channels = [] attach_protocol_messages = [] failed_messages = [] channels.each do |channel| channel.publish("foo").errback do failed_messages << channel end channel.on(:attaching) do |channel_state_change| error = channel_state_change.reason expect(error.message).to match(/Unable to recover connection/i) reattaching_channels << channel end channel.on(:attached) do attached_channels << channel next unless attached_channels.count == channel_count expect(reattaching_channels.count).to eql(channel_count) expect(failed_messages.count).to eql(channel_count) expect(attach_protocol_messages.uniq).to match(channels.map(&:name)) stop_reactor end end connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :attach attach_protocol_messages << protocol_message.channel end end kill_connection_transport_and_prevent_valid_resume end end it 'issue a reattach for all attaching channels and fail all queued messages (#RTN15c3)' do channel_count = 10 channels = channel_count.times.map { |index| client.channel("channel-#{index}") } channels.map(&:attach) attached_channels = [] attach_protocol_messages = [] failed_messages = [] channels.each do |channel| channel.publish("foo").errback do failed_messages << channel end channel.on(:attached) do |state_change| attached_channels << channel expect(state_change).to_not be_resumed next unless attached_channels.count == channel_count expect(failed_messages.count).to eql(channel_count) expect(attach_protocol_messages.uniq).to match(channels.map(&:name)) stop_reactor end end connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :attach attach_protocol_messages << protocol_message.channel end end client.connection.once(:connected) do kill_connection_transport_and_prevent_valid_resume end end it 'issue a attach for all suspended channels (#RTN15c3)' do channel_count = 10 channels = channel_count.times.map { |index| client.channel("channel-#{index}") } when_all(*channels.map(&:attach)) do # Force all channels into a suspended state channels.map do |channel| channel.transition_state_machine! :suspended expect(channel).to be_suspended end attached_channels = [] reattaching_channels = [] attach_protocol_messages = [] channels.each do |channel| channel.on(:attaching) do reattaching_channels << channel end channel.on(:attached) do attached_channels << channel next unless attached_channels.count == channel_count expect(reattaching_channels.count).to eql(channel_count) expect(attach_protocol_messages.uniq).to match(channels.map(&:name)) stop_reactor end end connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :attach attach_protocol_messages << protocol_message.channel end end kill_connection_transport_and_prevent_valid_resume end end it 'sets the error reason on each channel' do channel.attach do channel.on(:attaching) do |state_change| expect(state_change.reason.message).to match(/Unable to recover connection/i) expect(state_change.reason.code).to eql(80008) expect(channel.error_reason.code).to eql(80008) channel.on(:attached) do |state_change| stop_reactor end end kill_connection_transport_and_prevent_valid_resume end end it 'continues to use the client_msg_serial (#RTN15c3)' do last_message = nil channel = client.channels.get("foo") connection.once(:connected) do connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :message last_message = protocol_message end end channel.publish("first") do expect(last_message.message_serial).to eql(0) channel.publish("second") do expect(last_message.message_serial).to eql(1) connection.once(:connected) do channel.publish("first on new connection") do # Message serial reset after failed resume expect(last_message.message_serial).to eql(2) stop_reactor end end kill_connection_transport_and_prevent_valid_resume end end end end end context 'as the DISCONNECTED window to resume has passed' do let(:channel) { client.channel(random_str) } def kill_connection_transport_and_prevent_valid_resume connection.transport.close_connection_after_writing end it 'starts a new connection automatically and does not try and resume' do connection.once(:connected) do previous_connection_id = connection.id previous_connection_key = connection.key connection.once(:connected) do expect(connection.key).to_not eql(previous_connection_key) expect(connection.id).to_not eql(previous_connection_id) stop_reactor end # Wait until next tick before stubbing otherwise liveness test may # record the stubbed last contact time as the future time EventMachine.next_tick do five_minutes_time = Time.now + 5 * 60 allow(Time).to receive(:now) { five_minutes_time } kill_connection_transport_and_prevent_valid_resume end end end end end context 'when an ERROR protocol message is received' do %w(connecting connected).each do |state| state = state.to_sym context "whilst #{state}" do context 'with a token error code in the range 40140 <= code < 40150 (#RTN14b)' do let(:client_options) { default_options.merge(use_token_auth: true) } it 'triggers a re-authentication' do connection.once(state) do current_token = client.auth.current_token_details error_message = Ably::Models::ProtocolMessage.new(action: Ably::Models::ProtocolMessage::ACTION.Error.to_i, error: { code: 40140 }) connection.__incoming_protocol_msgbus__.publish :protocol_message, error_message connection.once(:connected) do expect(client.auth.current_token_details).to_not eql(current_token) stop_reactor end end end end context 'with an error code indicating an error other than a token failure (#RTN14g, #RTN15i)' do it 'causes the connection to fail' do connection.once(state) do connection.once(:failed) do stop_reactor end error_message = Ably::Models::ProtocolMessage.new(action: Ably::Models::ProtocolMessage::ACTION.Error.to_i, error: { code: 50000 }) connection.__incoming_protocol_msgbus__.publish :protocol_message, error_message end end end context 'with no error code indicating an error other than a token failure (#RTN14g, #RTN15i)' do it 'causes the connection to fail' do connection.once(state) do connection.once(:failed) do stop_reactor end error_message = Ably::Models::ProtocolMessage.new(action: Ably::Models::ProtocolMessage::ACTION.Error.to_i) connection.__incoming_protocol_msgbus__.publish :protocol_message, error_message end end end end end end context "whilst resuming" do context "with a token error code in the region 40140 <= code < 40150 (#{}RTN15c5)" do before do stub_const 'Ably::Models::TokenDetails::TOKEN_EXPIRY_BUFFER', 0 # allow token to be used even if about to expire stub_const 'Ably::Auth::TOKEN_DEFAULTS', Ably::Auth::TOKEN_DEFAULTS.merge(renew_token_buffer: 0) # Ensure tokens issued expire immediately after issue end let!(:four_second_token) { rest_client.auth.request_token(ttl: 4).token } let!(:normal_token) { rest_client.auth.request_token.token } let(:client_options) do default_options.merge(auth_callback: lambda do |token_params| @auth_requests ||= 0 @auth_requests += 1 case @auth_requests when 1 four_second_token when 2 normal_token end end) end it 'triggers a re-authentication and then resumes the connection' do connection.once(:connected) do connection_id = connection.id connecting_attempts = 0 connection.on(:connecting) { connecting_attempts += 1 } connection.once(:connected) do expect(@auth_requests).to eql(2) # initial + reconnect fails due to expiry & then obtains new token expect(connecting_attempts).to eql(2) # reconnect with failed token, then reconnect with successful token expect(connection.id).to eql(connection_id) stop_reactor end # Prevent token expired DISCONNECTED arriving on the transport # Instead we want to let the client lib catch a transport closed event # Then attempt to reconnect with an expired token connection.transport.__incoming_protocol_msgbus__.unsubscribe EventMachine.next_tick do # Lock the EventMachine for 4 seconds until the token has expired sleep 5 # Simulate an abrupt disconnection which will in turn resume but with an expired token connection.transport.close_connection_after_writing end end end end end context 'with any other error (#RTN15c4)' do it 'moves the connection to the failed state' do channel = client.channels.get("foo") channel.attach do connection.once(:failed) do |state_change| expect(state_change.reason.code).to eql(40400) expect(connection.error_reason.code).to eql(40400) expect(channel).to be_failed expect(channel.error_reason.code).to eql(40400) stop_reactor end allow(client.rest_client.auth).to receive(:key).and_return("invalid.key:secret") # Simulate an abrupt disconnection which will in turn resume with an invalid key connection.transport.close_connection_after_writing end end end end describe 'fallback host feature' do let(:retry_every_for_tests) { 0.2 } let(:max_time_in_state_for_tests) { 0.59 } let(:timeout_options) do default_options.merge( environment: :production, log_level: :none, disconnected_retry_timeout: retry_every_for_tests, suspended_retry_timeout: retry_every_for_tests, max_connection_state_ttl: max_time_in_state_for_tests ) end # Retry immediately and then wait retry_every before every subsequent attempt let(:expected_retry_attempts) { 1 + (max_time_in_state_for_tests / retry_every_for_tests).round } let(:retry_count_for_one_state) { 1 + expected_retry_attempts } # initial connect then disconnected let(:retry_count_for_all_states) { 1 + expected_retry_attempts + 1 } # initial connection, disconnected & then one suspended attempt context 'with custom realtime websocket host option' do let(:expected_host) { 'this.host.does.not.exist' } let(:client_options) { timeout_options.merge(realtime_host: expected_host) } it 'never uses a fallback host' do expect(connection).to receive(:create_transport).exactly(retry_count_for_all_states).times do |host| expect(host).to eql(expected_host) raise EventMachine::ConnectionError end connection.once(:suspended) do connection.once(:suspended) do stop_reactor end end end end context 'with custom realtime websocket port option' do let(:custom_port) { 666} let(:client_options) { timeout_options.merge(tls_port: custom_port) } it 'never uses a fallback host' do expect(connection).to receive(:create_transport).exactly(retry_count_for_all_states).times do |host, port| expect(port).to eql(custom_port) raise EventMachine::ConnectionError end connection.once(:suspended) do connection.once(:suspended) do stop_reactor end end end end context 'with non-production environment' do let(:environment) { 'sandbox' } let(:expected_host) { "#{environment}-#{Ably::Realtime::Client::DOMAIN}" } let(:client_options) { timeout_options.merge(environment: environment) } it 'does not use a fallback host by default' do expect(connection).to receive(:create_transport).exactly(retry_count_for_all_states).times do |host| expect(host).to eql(expected_host) raise EventMachine::ConnectionError end connection.once(:suspended) do connection.once(:suspended) do stop_reactor end end end context ':fallback_hosts_use_default is true' do let(:max_time_in_state_for_tests) { 4 } let(:fallback_hosts_used) { Array.new } let(:client_options) { timeout_options.merge(environment: environment, fallback_hosts_use_default: true) } it 'uses a fallback host on every subsequent disconnected attempt until suspended (#RTN17b, #TO3k7)' do request = 0 allow(connection).to receive(:create_transport) do |host| if request == 0 expect(host).to eql(expected_host) else fallback_hosts_used << host end request += 1 raise EventMachine::ConnectionError end connection.once(:suspended) do expect(fallback_hosts_used.uniq).to match_array(Ably::FALLBACK_HOSTS + [expected_host]) stop_reactor end end it 'does not use a fallback host if the connection connects on the default host and then later becomes disconnected', em_timeout: 25 do request = 0 allow(connection).to receive(:create_transport).and_wrap_original do |wrapped_proc, host, *args, &block| expect(host).to eql(expected_host) request += 1 wrapped_proc.call(host, *args, &block) end connection.on(:connected) do if request <= 2 EventMachine.add_timer(3) do # Force a disconnect connection.transport.unbind end else stop_reactor end end end end context ':fallback_hosts array is provided' do let(:max_time_in_state_for_tests) { 4 } let(:fallback_hosts) { %w(a.foo.com b.foo.com) } let(:fallback_hosts_used) { Array.new } let(:client_options) { timeout_options.merge(environment: environment, fallback_hosts: fallback_hosts) } it 'uses a fallback host on every subsequent disconnected attempt until suspended (#RTN17b, #TO3k6)' do request = 0 allow(connection).to receive(:create_transport) do |host| if request == 0 expect(host).to eql(expected_host) else fallback_hosts_used << host end request += 1 raise EventMachine::ConnectionError end connection.once(:suspended) do expect(fallback_hosts_used.uniq).to match_array(fallback_hosts + [expected_host]) stop_reactor end end end end context 'with production environment' do let(:custom_hosts) { %w(A.ably-realtime.com B.ably-realtime.com) } before do stub_const 'Ably::FALLBACK_HOSTS', custom_hosts end let(:expected_host) { Ably::Realtime::Client::DOMAIN } let(:client_options) { timeout_options.merge(environment: nil) } let(:fallback_hosts_used) { Array.new } context 'when the Internet is down' do before do allow(connection).to receive(:internet_up?).and_yield(false) end it 'never uses a fallback host' do expect(connection).to receive(:create_transport).exactly(retry_count_for_all_states).times do |host| expect(host).to eql(expected_host) raise EventMachine::ConnectionError end connection.once(:suspended) do connection.once(:suspended) do stop_reactor end end end end context 'when the Internet is up' do before do allow(connection).to receive(:internet_up?).and_yield(true) @suspended = 0 end context 'and default options' do let(:max_time_in_state_for_tests) { 2 } # allow time for 3 attempts, 2 configured fallbacks + primary host it 'uses a fallback host + the original host once on every subsequent disconnected attempt until suspended' do request = 0 expect(connection).to receive(:create_transport).exactly(retry_count_for_one_state).times do |host| if request == 0 expect(host).to eql(expected_host) else fallback_hosts_used << host end request += 1 raise EventMachine::ConnectionError end connection.once(:suspended) do fallback_hosts_used.pop # remove suspended attempt host expect(fallback_hosts_used.uniq).to match_array(custom_hosts + [expected_host]) stop_reactor end end it 'uses the primary host when suspended, and then every fallback host and the primary host again on every subsequent suspended attempt' do request = 0 expect(connection).to receive(:create_transport).at_least(:once) do |host| if request == 0 || request == expected_retry_attempts + 1 expect(host).to eql(expected_host) else expect(custom_hosts + [expected_host]).to include(host) fallback_hosts_used << host if @suspended > 0 end request += 1 raise EventMachine::ConnectionError end connection.on(:suspended) do @suspended += 1 if @suspended > 4 expect(fallback_hosts_used.uniq).to match_array(custom_hosts + [expected_host]) stop_reactor end end end it 'uses the correct host name for the WebSocket requests to the fallback hosts' do request = 0 expect(connection).to receive(:create_transport).at_least(:once) do |host, port, uri| if request == 0 || request == expected_retry_attempts + 1 expect(uri.hostname).to eql(expected_host) else expect(custom_hosts + [expected_host]).to include(uri.hostname) fallback_hosts_used << host if @suspended > 0 end request += 1 raise EventMachine::ConnectionError end connection.on(:suspended) do @suspended += 1 if @suspended > 4 expect(fallback_hosts_used.uniq).to match_array(custom_hosts + [expected_host]) stop_reactor end end end end context ':fallback_hosts array is provided by an empty array' do let(:max_time_in_state_for_tests) { 3 } let(:fallback_hosts) { [] } let(:hosts_used) { Array.new } let(:client_options) { timeout_options.merge(environment: 'production', fallback_hosts: fallback_hosts) } it 'uses a fallback host on every subsequent disconnected attempt until suspended (#RTN17b, #TO3k6)' do allow(connection).to receive(:create_transport) do |host| hosts_used << host raise EventMachine::ConnectionError end connection.once(:suspended) do expect(hosts_used.uniq.length).to eql(1) expect(hosts_used.uniq.first).to eql(expected_host) stop_reactor end end end context ':fallback_hosts array is provided' do let(:max_time_in_state_for_tests) { 3 } let(:fallback_hosts) { %w(a.foo.com b.foo.com) } let(:fallback_hosts_used) { Array.new } let(:client_options) { timeout_options.merge(environment: 'production', fallback_hosts: fallback_hosts) } it 'uses a fallback host on every subsequent disconnected attempt until suspended (#RTN17b, #TO3k6)' do request = 0 allow(connection).to receive(:create_transport) do |host| if request == 0 expect(host).to eql(expected_host) else fallback_hosts_used << host end request += 1 raise EventMachine::ConnectionError end connection.once(:suspended) do expect(fallback_hosts_used.uniq).to match_array(fallback_hosts + [expected_host]) stop_reactor end end end end end end end end