spec/acceptance/realtime/connection_spec.rb in ably-0.6.2 vs spec/acceptance/realtime/connection_spec.rb in ably-0.7.0
- old
+ new
@@ -1,181 +1,655 @@
+# encoding: utf-8
require 'spec_helper'
-describe Ably::Realtime::Connection do
- include RSpec::EventMachine
+describe Ably::Realtime::Connection, :event_machine do
let(:connection) { client.connection }
- [:json, :msgpack].each do |protocol|
- context "over #{protocol}" do
- let(:default_options) do
- { api_key: api_key, environment: environment, protocol: protocol }
+ vary_by_protocol do
+ let(:default_options) do
+ { api_key: api_key, environment: environment, protocol: protocol }
+ end
+ let(:client_options) { default_options }
+ let(:client) { Ably::Realtime::Client.new(client_options) }
+ before(:example) do
+ EventMachine.add_shutdown_hook do
+ connection.off # minimise side effects of callbacks from finished test calling stop_reactor
+ end
- let(:client) do
- Ably::Realtime::Client.new(default_options)
+ context 'intialization' do
+ it 'connects automatically' do
+ connection.on(:connected) do
+ expect(connection.state).to eq(:connected)
+ stop_reactor
+ end
- context 'with API key' do
- it 'connects automatically' do
- run_reactor do
- connection.on(:connected) do
- expect(connection.state).to eq(:connected)
- expect(client.auth.auth_params[:key_id]).to_not be_nil
- expect(client.auth.auth_params[:access_token]).to be_nil
- stop_reactor
- end
+ context 'with :connect_automatically option set to false' do
+ let(:client) do
+ Ably::Realtime::Client.new(default_options.merge(connect_automatically: false))
+ end
+ it 'does not connect automatically' do
+ EventMachine.add_timer(1) do
+ expect(connection).to be_initialized
+ stop_reactor
+ client
+ it 'connects when method #connect is called' do
+ connection.connect do
+ expect(connection).to be_connected
+ stop_reactor
+ end
+ end
- context 'with client_id resulting in token auth' do
- let(:default_options) do
- { api_key: api_key, environment: environment, protocol: protocol, client_id: SecureRandom.hex, log_level: :debug }
+ context 'with token auth' do
+ before do
+ # Reduce token expiry buffer to zero so that a token expired? predicate is exact
+ # Normally there is a buffer so that a token expiring soon is considered expired
+ stub_const 'Ably::Models::Token::TOKEN_EXPIRY_BUFFER', 0
- it 'connects automatically' do
- run_reactor do
- connection.on(:connected) do
- expect(connection.state).to eq(:connected)
- expect(client.auth.auth_params[:access_token]).to_not be_nil
- expect(client.auth.auth_params[:key_id]).to be_nil
- stop_reactor
+ context 'for renewable tokens' do
+ context 'that are valid for the duration of the test' do
+ context 'with valid pre authorised token expiring in the future' do
+ it 'uses the existing token created by Auth' do
+ client.auth.authorise(ttl: 300)
+ expect(client.auth).to_not receive(:request_token)
+ connection.once(:connected) do
+ stop_reactor
+ end
+ end
+ context 'with implicit authorisation' do
+ let(:client_options) { default_options.merge(client_id: 'force_token_auth') }
+ it 'uses the token created by the implicit authorisation' do
+ expect(client.auth).to receive(:request_token).once.and_call_original
+ connection.once(:connected) do
+ stop_reactor
+ end
+ end
+ end
- end
- end
- context 'initialization phases' do
- let(:phases) { [:initialized, :connecting, :connected] }
- let(:events_triggered) { [] }
+ context 'that expire' do
+ let(:client_options) { default_options.merge(log_level: :none) }
- it 'are triggered in order' do
- test_expectation = Proc.new do
- expect(events_triggered).to eq(phases)
- stop_reactor
+ before do
+ client.auth.authorise(ttl: ttl)
+ end
+ context 'opening a new connection' do
+ context 'with recently expired token' do
+ let(:ttl) { 2 }
+ it 'renews the token on connect' do
+ sleep ttl + 0.1
+ expect(client.auth.current_token).to be_expired
+ expect(client.auth).to receive(:authorise).once.and_call_original
+ connection.once(:connected) do
+ expect(client.auth.current_token).to_not be_expired
+ stop_reactor
+ end
+ end
+ end
+ context 'with immediately expiring token' do
+ let(:ttl) { 0.01 }
+ it 'renews the token on connect, and only makes one subsequent attempt to obtain a new token' do
+ expect(client.auth).to receive(:authorise).twice.and_call_original
+ connection.once(:disconnected) do
+ connection.once(:failed) do |error|
+ expect(error.code).to eql(40140) # token expired
+ stop_reactor
+ end
+ end
+ end
+ it 'uses the primary host for subsequent connection and auth requests' do
+ EventMachine.add_timer(1) do # wait for token to expire
+ connection.once(:disconnected) do
+ expect(client.rest_client.connection).to receive(:post).with(/requestToken$/, anything).and_call_original
+ expect(client.rest_client).to_not receive(:fallback_connection)
+ expect(client).to_not receive(:fallback_endpoint)
+ connection.once(:failed) do
+ connection.off
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ end
+ context 'when connected with a valid non-expired token' do
+ context 'that then expires following the connection being opened' do
+ let(:ttl) { 2 }
+ let(:channel) { client.channel('test') }
+ context 'the server' do
+ it 'disconnects the client, and the client automatically renews the token and then reconnects', em_timeout: 10 do
+ expect(client.auth.current_token).to_not be_expired
+ channel.attach
+ original_token = client.auth.current_token
+ connection.once(:connected) do
+ started_at = Time.now
+ connection.once(:disconnected) do |error|
+ expect(Time.now - started_at >= ttl)
+ expect(original_token).to be_expired
+ expect(error.code).to eql(40140) # token expired
+ connection.once(:connected) do
+ expect(client.auth.current_token).to_not be_expired
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ skip 'retains connection state'
+ skip 'changes state to failed if a new token cannot be issued'
+ end
+ end
+ end
- run_reactor do
- phases.each do |phase|
- connection.on(phase) do
- events_triggered << phase
- test_expectation.call if events_triggered.length == phases.length
+ context 'for non-renewable tokens' do
+ context 'that are expired' do
+ let!(:expired_token) do
+ Ably::Realtime::Client.new(default_options).auth.request_token(ttl: 0.01)
+ end
+ context 'opening a new connection' do
+ let(:client_options) { default_options.merge(api_key: nil, token_id: expired_token.id, log_level: :none) }
+ it 'transitions state to failed', em_timeout: 10 do
+ EventMachine.add_timer(1) do # wait for token to expire
+ expect(expired_token).to be_expired
+ connection.once(:connected) { raise 'Connection should never connect as token has expired' }
+ connection.once(:failed) do
+ expect(client.connection.error_reason.code).to eql(40140)
+ stop_reactor
+ end
+ end
+ context 'when connected' do
+ skip 'transitions state to failed'
+ end
- skip '#close disconnects, closes the connection immediately and changes the connection state to closed'
+ end
- specify '#close(graceful: true) gracefully waits for the server to close the connection' do
- run_reactor(8) do
- connection.close
- connection.on(:closed) do
- expect(connection.state).to eq(:closed)
- stop_reactor
+ context 'initialization state changes' do
+ let(:phases) { [:connecting, :connected] }
+ let(:events_triggered) { [] }
+ let(:test_expectation) do
+ Proc.new do
+ expect(events_triggered).to eq(phases)
+ stop_reactor
+ end
+ end
+ def expect_ordered_phases
+ phases.each do |phase|
+ connection.on(phase) do
+ events_triggered << phase
+ test_expectation.call if events_triggered.length == phases.length
- it 'echoes a heart beat with #ping' do
- run_reactor do
- connection.on(:connected) do
- connection.ping do |time_elapsed|
- expect(time_elapsed).to be > 0
+ context 'with implicit #connect' do
+ it 'are triggered in order' do
+ expect_ordered_phases
+ end
+ end
+ context 'with explicit #connect' do
+ it 'are triggered in order' do
+ expect_ordered_phases
+ connection.connect
+ end
+ end
+ end
+ context '#connect' do
+ it 'returns a Deferrable' do
+ expect(connection.connect).to be_a(EventMachine::Deferrable)
+ stop_reactor
+ end
+ it 'calls the Deferrable callback on success' do
+ connection.connect.callback do |connection|
+ expect(connection).to be_a(Ably::Realtime::Connection)
+ expect(connection.state).to eq(:connected)
+ stop_reactor
+ end
+ end
+ context 'when already connected' do
+ it 'does nothing and no further state changes are emitted' do
+ connection.once(:connected) do
+ connection.once_state_changed { raise 'State should not have changed' }
+ 3.times { connection.connect }
+ EventMachine.add_timer(1) do
+ expect(connection).to be_connected
+ connection.off
- skip 'connects, closes gracefully and reconnects on #connect'
+ describe 'once connected' do
+ let(:connection2) { Ably::Realtime::Client.new(client_options).connection }
- it 'connects, closes the connection, and then reconnects with a new connection ID' do
- run_reactor(15) do
+ describe 'connection#id' do
+ it 'is a string' do
+ connection.connect do
+ expect(connection.id).to be_a(String)
+ stop_reactor
+ end
+ end
+ it 'is unique from the connection#key' do
+ connection.connect do
+ expect(connection.id).to_not eql(connection.key)
+ stop_reactor
+ end
+ end
+ it 'is unique for every connection' do
+ when_all(connection.connect, connection2.connect) do
+ expect(connection.id).to_not eql(connection2.id)
+ stop_reactor
+ end
+ end
+ end
+ describe 'connection#key' do
+ it 'is a string' do
+ connection.connect do
+ expect(connection.key).to be_a(String)
+ stop_reactor
+ end
+ end
+ it 'is unique from the connection#id' do
+ connection.connect do
+ expect(connection.key).to_not eql(connection.id)
+ stop_reactor
+ end
+ end
+ it 'is unique for every connection' do
+ when_all(connection.connect, connection2.connect) do
+ expect(connection.key).to_not eql(connection2.key)
+ stop_reactor
+ end
+ end
+ end
+ end
+ context 'following a previous connection being opened and closed' do
+ it 'reconnects and is provided with a new connection ID and connection key from the server' do
connection.connect do
- connection_id = connection.id
+ connection_id = connection.id
+ connection_key = connection.key
connection.close do
connection.connect do
expect(connection.id).to_not eql(connection_id)
+ expect(connection.key).to_not eql(connection_key)
+ end
- context 'failures' do
- context 'with invalid app part of the key' do
- let(:missing_key) { 'not_an_app.invalid_key_id:invalid_key_value' }
- let(:client) do
- Ably::Realtime::Client.new(default_options.merge(api_key: missing_key))
+ context '#close' do
+ it 'returns a Deferrable' do
+ connection.connect do
+ expect(connection.close).to be_a(EventMachine::Deferrable)
+ stop_reactor
+ end
+ end
+ it 'calls the Deferrable callback on success' do
+ connection.connect do
+ connection.close.callback do |connection|
+ expect(connection).to be_a(Ably::Realtime::Connection)
+ expect(connection.state).to eq(:closed)
+ stop_reactor
+ end
+ end
- it 'enters the failed state and returns a not found error' do
- run_reactor do
- connection.on(:failed) do |error|
- expect(connection.state).to eq(:failed)
- expect(error.status).to eq(404)
+ context 'when already closed' do
+ it 'does nothing and no further state changes are emitted' do
+ connection.once(:connected) do
+ connection.close do
+ connection.once_state_changed { raise 'State should not have changed' }
+ 3.times { connection.close }
+ EventMachine.add_timer(1) do
+ expect(connection).to be_closed
+ connection.off
+ end
- context 'with invalid key ID part of the key' do
- let(:invalid_key) { "#{app_id}.invalid_key_id:invalid_key_value" }
- let(:client) do
- Ably::Realtime::Client.new(default_options.merge(api_key: invalid_key))
+ context 'when connection state is' do
+ let(:events) { Hash.new }
+ def log_connection_changes
+ connection.on(:closing) { events[:closing_emitted] = true }
+ connection.on(:error) { events[:error_emitted] = true }
+ connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ events[:closed_message_from_server_received] = true if protocol_message.action == :closed
+ end
- it 'enters the failed state and returns an authorization error' do
- run_reactor do
- connection.on(:failed) do |error|
- expect(connection.state).to eq(:failed)
- expect(error.status).to eq(401)
+ context ':initialized' do
+ it 'changes the connection state to :closing and then immediately :closed without sending a ProtocolMessage CLOSE' do
+ connection.on(:closed) do
+ expect(connection.state).to eq(:closed)
+ EventMachine.add_timer(1) do # allow for all subscribers on incoming message bes
+ expect(events[:error_emitted]).to_not eql(true)
+ expect(events[:closed_message_from_server_received]).to_not eql(true)
+ expect(events[:closing_emitted]).to eql(true)
+ log_connection_changes
+ connection.close
- context 'with invalid WebSocket host' do
- let(:client) do
- Ably::Realtime::Client.new(default_options.merge(ws_host: 'non.existent.host'))
+ context ':connected' do
+ it 'changes the connection state to :closing and waits for the server to confirm connection is :closed with a ProtocolMessage' do
+ connection.on(:connected) do
+ connection.on(:closed) do
+ EventMachine.add_timer(1) do # allow for all subscribers on incoming message bus
+ expect(events[:error_emitted]).to_not eql(true)
+ expect(events[:closed_message_from_server_received]).to eql(true)
+ expect(events[:closing_emitted]).to eql(true)
+ stop_reactor
+ end
+ end
+ log_connection_changes
+ connection.close
+ end
- it 'enters the failed state and returns an authorization error' do
- run_reactor do
- connection.on(:failed) do |error|
- expect(connection.state).to eq(:failed)
- expect(error.code).to eq(80000)
- expect(error.status).to be_nil
- stop_reactor
+ context 'with an unresponsive connection' do
+ let(:stubbed_timeout) { 2 }
+ before do
+ stub_const 'Ably::Realtime::Connection::ConnectionManager::TIMEOUTS',
+ Ably::Realtime::Connection::ConnectionManager::TIMEOUTS.merge(close: stubbed_timeout)
+ connection.on(:connected) do
+ # Prevent all incoming & outgoing ProtocolMessages from being processed by the client library
+ connection.__outgoing_protocol_msgbus__.unsubscribe
+ connection.__incoming_protocol_msgbus__.unsubscribe
+ it 'force closes the connection when a :closed ProtocolMessage response is not received' do
+ connection.on(:connected) do
+ close_requested_at = Time.now
+ connection.on(:closed) do
+ expect(Time.now - close_requested_at).to be >= stubbed_timeout
+ expect(connection.state).to eq(:closed)
+ expect(events[:error_emitted]).to_not eql(true)
+ expect(events[:closed_message_from_server_received]).to_not eql(true)
+ expect(events[:closing_emitted]).to eql(true)
+ stop_reactor
+ end
+ log_connection_changes
+ connection.close
+ end
+ end
+ end
- it 'opens many connections simultaneously' do
- run_reactor(15) do
- count, connected_ids = 25, []
+ context '#ping' do
+ it 'echoes a heart beat' do
+ connection.on(:connected) do
+ connection.ping do |time_elapsed|
+ expect(time_elapsed).to be > 0
+ stop_reactor
+ end
+ end
+ end
- clients = count.times.map do
- Ably::Realtime::Client.new(default_options)
+ context 'when not connected' do
+ it 'raises an exception' do
+ expect { connection.ping }.to raise_error RuntimeError, /Cannot send a ping when connection/
+ stop_reactor
+ end
+ end
+ end
+ context 'recovery' do
+ let(:channel_name) { random_str }
+ let(:channel) { client.channel(channel_name) }
+ let(:publishing_client) do
+ Ably::Realtime::Client.new(client_options)
+ end
+ let(:publishing_client_channel) { publishing_client.channel(channel_name) }
+ let(:client_options) { default_options.merge(log_level: :fatal) }
+ before do
+ # Reconfigure client library retry periods and timeouts so that tests run quickly
+ stub_const 'Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG',
+ Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG.merge(
+ disconnected: { retry_every: 0.1, max_time_in_state: 0.2 },
+ suspended: { retry_every: 0.1, max_time_in_state: 0.2 },
+ )
+ end
+ describe '#recovery_key' do
+ def self.available_states
+ [:connecting, :connected, :disconnected, :suspended, :failed]
+ end
+ let(:available_states) { self.class.available_states}
+ let(:states) { Hash.new }
+ let(:client_options) { default_options.merge(log_level: :none) }
+ it 'is composed of connection id and serial that is kept up to date with each message sent' do
+ connection.on(:connected) do
+ expected_serial = -1
+ expect(connection.id).to_not be_nil
+ expect(connection.serial).to eql(expected_serial)
+ client.channel('test').attach do |channel|
+ channel.publish('event', 'data') do
+ expected_serial += 1 # attach message received
+ expect(connection.serial).to eql(expected_serial)
+ channel.publish('event', 'data') do
+ expected_serial += 1 # attach message received
+ expect(connection.serial).to eql(expected_serial)
+ stop_reactor
+ end
+ end
+ end
+ end
- clients.each do |client|
- client.connection.on(:connected) do
- connected_ids << client.connection.id
+ it "is available when connection is in one of the states: #{available_states.join(', ')}" do
+ connection.once(:connected) do
+ allow(client).to receive(:endpoint).and_return(
+ URI::Generic.build(
+ scheme: 'wss',
+ host: 'this.host.does.not.exist.com'
+ )
+ )
- if connected_ids.count == 25
- expect(connected_ids.uniq.count).to eql(25)
+ connection.transition_state_machine! :disconnected
+ end
+ available_states.each do |state|
+ connection.on(state) do
+ states[state.to_sym] = true if connection.recovery_key
+ end
+ end
+ connection.once(:failed) do
+ expect(states.keys).to match_array(available_states)
+ stop_reactor
+ end
+ end
+ it 'is nil when connection is explicitly CLOSED' do
+ connection.once(:connected) do
+ connection.close do
+ expect(connection.recovery_key).to be_nil
+ stop_reactor
+ end
+ end
+ end
+ end
+ context "opening a new connection using a recently disconnected connection's #recovery_key" do
+ context 'connection#id and connection#key after recovery' do
+ let(:client_options) { default_options.merge(log_level: :none) }
+ it 'remain the same' do
+ previous_connection_id = nil
+ previous_connection_key = nil
+ connection.once(:connected) do
+ previous_connection_id = connection.id
+ previous_connection_key = connection.key
+ connection.transition_state_machine! :failed
+ end
+ connection.once(:failed) do
+ recover_client = Ably::Realtime::Client.new(default_options.merge(recover: client.connection.recovery_key))
+ recover_client.connection.on(:connected) do
+ expect(recover_client.connection.key).to eql(previous_connection_key)
+ expect(recover_client.connection.id).to eql(previous_connection_id)
+ end
+ context 'when messages have been sent whilst the old connection is disconnected' do
+ describe 'the new connection' do
+ let(:client_options) { default_options.merge(log_level: :none) }
+ it 'recovers server-side queued messages' do
+ channel.attach do |message|
+ connection.transition_state_machine! :failed
+ end
+ connection.on(:failed) do
+ publishing_client_channel.publish('event', 'message') do
+ recover_client = Ably::Realtime::Client.new(default_options.merge(recover: client.connection.recovery_key))
+ recover_client.channel(channel_name).attach do |recover_client_channel|
+ recover_client_channel.subscribe('event') do |message|
+ expect(message.data).to eql('message')
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+ context 'with :recover option' do
+ context 'with invalid syntax' do
+ let(:invaid_client_options) { default_options.merge(recover: 'invalid') }
+ it 'raises an exception' do
+ expect { Ably::Realtime::Client.new(invaid_client_options) }.to raise_error ArgumentError, /Recover/
+ stop_reactor
+ end
+ end
+ context 'with invalid value' do
+ let(:client_options) { default_options.merge(recover: 'invalid:key', log_level: :fatal) }
+ skip 'triggers an error on the connection object, sets the #error_reason and connects anyway' do
+ connection.on(:error) do |error|
+ expect(connection.state).to eq(:connected)
+ expect(connection.error_reason.message).to match(/Recover/)
+ expect(connection.error_reason).to eql(error)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ context 'with many connections simultaneously', em_timeout: 15 do
+ let(:connection_count) { 40 }
+ let(:connection_ids) { [] }
+ let(:connection_keys) { [] }
+ it 'opens each with a unique connection#id and connection#key' do
+ connection_count.times.map do
+ Ably::Realtime::Client.new(client_options)
+ end.each do |client|
+ client.connection.on(:connected) do
+ connection_ids << client.connection.id
+ connection_keys << client.connection.key
+ next unless connection_ids.count == connection_count
+ expect(connection_ids.uniq.count).to eql(connection_count)
+ expect(connection_keys.uniq.count).to eql(connection_count)
+ stop_reactor
+ end
+ end
+ end
+ end
+ context 'when a state transition is unsupported' do
+ let(:client_options) { default_options.merge(log_level: :none) } # silence FATAL errors
+ it 'emits a StateChangeError' do
+ connection.connect do
+ connection.transition_state_machine :initialized
+ end
+ connection.on(:error) do |error|
+ expect(error).to be_a(Ably::Exceptions::StateChangeError)
+ stop_reactor