spec/acceptance/realtime/connection_spec.rb in ably-0.8.5 vs spec/acceptance/realtime/connection_spec.rb in ably-0.8.6
- old
+ new
@@ -82,92 +82,131 @@
end
end
end
context 'that expire' do
- let(:client_options) { default_options.merge(log_level: :none) }
+ let(:client_options) { default_options.merge(log_level: :error) }
before do
expect(client.rest_client.time.to_f).to be_within(2).of(Time.now.to_i), "Local clock is out of sync with Ably"
end
before do
# Ensure tokens issued expire immediately after issue
@original_renew_token_buffer = Ably::Auth::TOKEN_DEFAULTS.fetch(:renew_token_buffer)
stub_const 'Ably::Auth::TOKEN_DEFAULTS', Ably::Auth::TOKEN_DEFAULTS.merge(renew_token_buffer: 0)
-
- # Authorise synchronously to ensure token has been issued
- client.auth.authorise_sync(ttl: ttl)
end
let(:original_renew_token_buffer) { @original_renew_token_buffer }
context 'opening a new connection' do
- context 'with recently expired token' do
+ context 'with almost expired tokens' do
+ before do
+ # Authorise synchronously to ensure token has been issued
+ client.auth.authorise_sync(ttl: ttl)
+ end
+
let(:ttl) { 2 }
- it 'renews the token on connect without changing connection state' do
- connection.once(:connecting) do
- sleep ttl + 0.1
- expect(client.auth.current_token_details).to be_expired
- expect(client.rest_client.auth).to receive(:authorise).at_least(:once).and_call_original
- connection.once(:connected) do
- expect(client.auth.current_token_details).to_not be_expired
+ it 'renews token every time after it expires' do
+ started_at = Time.now.to_f
+ connected_times = 0
+ disconnected_times = 0
+ connection.on(:connected) do
+ connected_times += 1
+ end
+ connection.on(:disconnected) do
+ disconnected_times += 1
+ if disconnected_times == 3
+ expect(connected_times).to eql(3)
+ expect(Time.now.to_f - started_at).to be > ttl * 3
+ expect(Time.now.to_f - started_at).to be < (ttl * 2) * 3
stop_reactor
end
- connection.once_state_changed do
- raise "Invalid state #{connection.state}" unless connection.state == :connected
- end
end
end
end
- context 'with immediately expiring token' do
+ context 'with immediately expired token' do
let(:ttl) { 0.001 }
+ let(:auth_requests) { [] }
+ let(:token_callback) do
+ Proc.new do
+ auth_requests << Time.now
+ Ably::Rest::Client.new(default_options).auth.request_token(ttl: ttl).token
+ end
+ end
+ let(:client_options) { default_options.merge(auth_callback: token_callback) }
- it 'renews the token on connect, and only makes one subsequent attempt to obtain a new token' do
- expect(client.rest_client.auth).to receive(:authorise).at_least(:twice).and_call_original
+ it 'renews the token on connect, and makes one immediate subsequent attempt to obtain a new token' do
+ started_at = Time.now.to_f
connection.once(:disconnected) do
- connection.once(:failed) do |connection_state_change|
+ connection.once(:disconnected) do |connection_state_change|
expect(connection_state_change.reason.code).to eql(40140) # token expired
+ expect(Time.now.to_f - started_at).to be < 1000
+ expect(auth_requests.count).to eql(2)
stop_reactor
end
end
end
- it 'uses the primary host for subsequent connection and auth requests' do
- EventMachine.add_timer(1) do # wait for token to expire
+ context 'when disconnected_retry_timeout is 0.5 seconds' do
+ let(:client_options) { default_options.merge(disconnected_retry_timeout: 0.5, auth_callback: token_callback, log_level: :error) }
+
+ it 'renews the token on connect, and continues to attempt renew based on the retry schedule' do
+ started_at = Time.now.to_f
+ disconnect_count = 0
+ connection.on(:disconnected) do |connection_state_change|
+ expect(connection_state_change.reason.code).to eql(40140) # token expired
+ disconnect_count += 1
+ if disconnect_count == 6
+ expect(Time.now.to_f - started_at).to be > 4 * 0.5 # at least 4 0.5 second pauses should have happened
+ expect(Time.now.to_f - started_at).to be < 9 # allow 1.5 seconds for each authentication cycle
+ stop_reactor
+ end
+ end
+ end
+ end
+
+ context 'using implicit token auth' do
+ let(:client_options) { default_options.merge(use_token_auth: true, token_params: { ttl: ttl }) }
+
+ before do
+ stub_const 'Ably::Models::TokenDetails::TOKEN_EXPIRY_BUFFER', -10 # ensure client lib thinks token is still valid
+ end
+
+ it 'uses the primary host for subsequent connection and auth requests' do
connection.once(:disconnected) do
expect(client.rest_client.connection).to receive(:post).
with(/requestToken$/, anything).
- at_least(:once).
+ exactly(:once).
and_call_original
expect(client.rest_client).to_not receive(:fallback_connection)
expect(client).to_not receive(:fallback_endpoint)
- connection.once(:failed) do
- connection.off
+ connection.once(:disconnected) do
stop_reactor
end
end
end
end
end
end
context 'when connected with a valid non-expired token' do
context 'that then expires following the connection being opened' do
- let(:ttl) { 5 }
- let(:channel) { client.channel(random_str) }
+ let(:ttl) { 5 }
+ let(:channel_name) { random_str }
+ let(:channel) { client.channel(channel_name) }
+ let(:client_options) { default_options.merge(use_token_auth: true, token_params: { ttl: ttl }) }
context 'the server' do
it 'disconnects the client, and the client automatically renews the token and then reconnects', em_timeout: 15 do
- original_token = client.auth.current_token_details
- expect(original_token).to_not be_expired
-
connection.once(:connected) do
+ original_token = client.auth.current_token_details
+ expect(original_token).to_not be_expired
started_at = Time.now
connection.once(:disconnected) do |connection_state_change|
expect(connection_state_change.reason.code).to eq(40140) # Token expired
# Token has expired, so now ensure it is not used again
@@ -187,12 +226,90 @@
channel.attach
end
end
- skip 'retains connection state'
- skip 'changes state to failed if a new token cannot be issued'
+ context 'connection state' do
+ let(:ttl) { 4 }
+ let(:auth_requests) { [] }
+ let(:token_callback) do
+ Proc.new do
+ sleep 2
+ auth_requests << Time.now
+ Ably::Rest::Client.new(default_options).auth.request_token(ttl: ttl).token
+ end
+ end
+ let(:client_options) { default_options.merge(auth_callback: token_callback) }
+ let(:publishing_client) { auto_close Ably::Realtime::Client.new(default_options) }
+ let(:publishing_channel) { publishing_client.channels.get(channel_name) }
+ let(:messages_received) { [] }
+
+ def publish_and_check_first_disconnect
+ 10.times.each { |index| publishing_channel.publish('event', index.to_s) }
+ channel.subscribe('event') do |message|
+ messages_received << message.data.to_i
+ if messages_received.count == 10
+ expect(messages_received).to match(10.times)
+ expect(auth_requests.count).to eql(2)
+ EventMachine.add_timer(1) do
+ channel.unsubscribe 'event'
+ yield
+ end
+ end
+ end
+ end
+
+ def publish_and_check_second_disconnect
+ 10.times.each { |index| publishing_channel.publish('event', (index + 10).to_s) }
+ channel.subscribe('event') do |message|
+ messages_received << message.data.to_i
+ if messages_received.count == 20
+ expect(messages_received).to match(20.times)
+ expect(auth_requests.count).to eql(3)
+ stop_reactor
+ end
+ end
+ end
+
+ it 'retains messages published when disconnected twice during authentication', em_timeout: 20 do
+ publishing_channel.attach do
+ channel.attach do
+ connection.once(:disconnected) do
+ publish_and_check_first_disconnect do
+ connection.once(:disconnected) do
+ publish_and_check_second_disconnect
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+
+ context 'and subsequent token is invalid' do
+ let(:ttl) { 2 }
+ let(:token_callback) do
+ Proc.new do
+ if @token_issued
+ "#{app_id}.invalid-token-invalid-token-invalid-token"
+ else
+ @token_issued = true
+ Ably::Rest::Client.new(default_options).auth.request_token(ttl: ttl).token
+ end
+ end
+ end
+ let(:client_options) { default_options.merge(auth_callback: token_callback, log_level: :none) }
+
+ it 'transitions the connection to the failed state' do
+ connection.once(:disconnected) do
+ connection.once(:failed) do
+ expect(connection.error_reason.code).to eql(40101)
+ stop_reactor
+ end
+ end
+ end
+ end
end
end
end
end
@@ -226,10 +343,54 @@
context 'when connected' do
skip 'transitions state to failed'
end
end
end
+
+ context 'with opaque token string that contain an implicit client_id' do
+ let(:client_options) { default_options.merge(token: token_string, key: nil) }
+ let(:rest_auth_client) { Ably::Rest::Client.new(default_options.merge(key: api_key)) }
+ let(:token_string) { rest_auth_client.auth.request_token(client_id: client_id).token }
+
+ context 'string' do
+ let(:client_id) { random_str }
+
+ it 'sets the Client#client_id and Auth#client_id once CONNECTED' do
+ expect(client.client_id).to be_nil
+ client.connection.once(:connected) do
+ expect(client.client_id).to eql(client_id)
+ stop_reactor
+ end
+ end
+
+ context 'that is incompatible with the current client client_id' do
+ let(:client_id) { random_str }
+ let(:client_options) { default_options.merge(client_id: 'incompatible', token: token_string, key: nil, log_level: :none) }
+
+ it 'fails the connection' do
+ expect(client.client_id).to eql('incompatible')
+ client.connection.once(:failed) do
+ expect(client.client_id).to eql('incompatible')
+ expect(client.connection.error_reason.code).to eql(40101) # Invalid clientId for credentials
+ stop_reactor
+ end
+ end
+ end
+ end
+
+ context 'wildcard' do
+ let(:client_id) { '*' }
+
+ it 'configures the Client#client_id and Auth#client_id with a wildcard once CONNECTED' do
+ expect(client.client_id).to be_nil
+ client.connection.once(:connected) do
+ expect(client.client_id).to eql('*')
+ stop_reactor
+ end
+ end
+ end
+ end
end
end
context 'initialization state changes' do
let(:phases) { [:connecting, :connected] }
@@ -275,10 +436,54 @@
expect(connection.state).to eq(:connected)
stop_reactor
end
end
+ it 'calls the provided block on success even if state changes to disconnected first' do
+ been_disconnected = false
+
+ connection.once(:disconnected) do
+ been_disconnected = true
+ end
+ connection.once(:connecting) do
+ close_if_transport_available = proc do
+ EventMachine.add_timer(0.001) do
+ if connection.transport
+ connection.transport.close_connection_after_writing
+ else
+ close_if_transport_available.call
+ end
+ end
+ end
+ close_if_transport_available.call
+ end
+
+ connection.connect do
+ expect(connection.state).to eq(:connected)
+ expect(been_disconnected).to be_truthy
+ stop_reactor
+ end
+ end
+
+ context 'with invalid auth details' do
+ let(:client_options) { default_options.merge(key: 'this.is:invalid', log_level: :none) }
+
+ it 'calls the Deferrable errback only once on connection failure' do
+ errback_called = false
+ connection.connect.errback do
+ expect(connection.state).to eq(:failed)
+
+ raise 'Errback already called' if errback_called
+ errback_called = true
+
+ connection.connect.errback do
+ EventMachine.add_timer(0.5) { stop_reactor }
+ end
+ end
+ end
+ end
+
context 'when already connected' do
it 'does nothing and no further state changes are emitted' do
connection.once(:connected) do
connection.once_state_changed { raise 'State should not have changed' }
3.times { connection.connect }
@@ -507,16 +712,14 @@
connection.close
end
end
context 'with an unresponsive connection' do
- let(:stubbed_timeout) { 2 }
+ let(:custom_timeout) { 2 }
+ let(:client_options) { default_options.merge(realtime_request_timeout: custom_timeout) }
before do
- stub_const 'Ably::Realtime::Connection::ConnectionManager::TIMEOUTS',
- Ably::Realtime::Connection::ConnectionManager::TIMEOUTS.merge(close: stubbed_timeout)
-
connection.on(:connected) do
# Prevent all incoming & outgoing ProtocolMessages from being processed by the client library
connection.__outgoing_protocol_msgbus__.unsubscribe
connection.__incoming_protocol_msgbus__.unsubscribe
end
@@ -525,11 +728,11 @@
it 'force closes the connection when a :closed ProtocolMessage response is not received' do
connection.on(:connected) do
close_requested_at = Time.now
connection.on(:closed) do
- expect(Time.now - close_requested_at).to be >= stubbed_timeout
+ expect(Time.now - close_requested_at).to be >= custom_timeout
expect(connection.state).to eq(:closed)
expect(events[:error_emitted]).to_not eql(true)
expect(events[:closed_message_from_server_received]).to_not eql(true)
expect(events[:closing_emitted]).to eql(true)
stop_reactor
@@ -569,37 +772,59 @@
end
connection.ping { raise 'Forced exception' }
end
end
end
+
+ context 'when ping times out' do
+ let(:client_options) { default_options.merge(log_level: :error) }
+
+ it 'logs a warning' do
+ connection.once(:connected) do
+ allow(connection).to receive(:defaults).and_return(connection.defaults.merge(realtime_request_timeout: 0.0001))
+ expect(connection.logger).to receive(:warn).with(/Ping timed out/) do
+ stop_reactor
+ end
+ connection.ping
+ end
+ end
+
+ it 'yields to the block with a nil value' do
+ connection.once(:connected) do
+ allow(connection).to receive(:defaults).and_return(connection.defaults.merge(realtime_request_timeout: 0.0001))
+ connection.ping do |time_elapsed|
+ expect(time_elapsed).to be_nil
+ stop_reactor
+ end
+ end
+ end
+ end
end
context 'recovery' do
let(:channel_name) { random_str }
let(:channel) { client.channel(channel_name) }
let(:publishing_client) do
auto_close Ably::Realtime::Client.new(client_options)
end
let(:publishing_client_channel) { publishing_client.channel(channel_name) }
- let(:client_options) { default_options.merge(log_level: :fatal) }
- before do
- # Reconfigure client library retry periods and timeouts so that tests run quickly
- stub_const 'Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG',
- Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG.merge(
- disconnected: { retry_every: 0.1, max_time_in_state: 0.2 },
- suspended: { retry_every: 0.1, max_time_in_state: 0.2 },
- )
+ let(:client_options) do
+ default_options.merge(
+ log_level: :none,
+ disconnected_retry_timeout: 0.1,
+ suspended_retry_timeout: 0.1,
+ connection_state_ttl: 0.2
+ )
end
describe '#recovery_key' do
def self.available_states
[:connecting, :connected, :disconnected, :suspended, :failed]
end
let(:available_states) { self.class.available_states }
let(:states) { Hash.new }
- let(:client_options) { default_options.merge(log_level: :none) }
let(:channel) { client.channel(random_str) }
it 'is composed of connection key and serial that is kept up to date with each message ACK received' do
connection.on(:connected) do
expected_serial = -1
@@ -639,10 +864,15 @@
connection.on(state) do
states[state.to_sym] = true if connection.recovery_key
end
end
+ connection.once(:suspended) do
+ error_message = Ably::Models::ProtocolMessage.new(action: 9, error: { message: 'force failure' })
+ connection.__incoming_protocol_msgbus__.publish :protocol_message, error_message
+ end
+
connection.once(:failed) do
expect(states.keys).to match_array(available_states)
stop_reactor
end
end
@@ -657,12 +887,10 @@
end
end
context "opening a new connection using a recently disconnected connection's #recovery_key" do
context 'connection#id and connection#key after recovery' do
- let(:client_options) { default_options.merge(log_level: :none) }
-
it 'remains the same' do
previous_connection_id = nil
previous_connection_key = nil
connection.once(:connected) do
@@ -699,12 +927,10 @@
end
end
context 'when messages have been sent whilst the old connection is disconnected' do
describe 'the new connection' do
- let(:client_options) { default_options.merge(log_level: :none) }
-
it 'recovers server-side queued messages' do
channel.attach do
connection.transition_state_machine! :failed
end
@@ -772,11 +998,11 @@
let(:connection_ids) { [] }
let(:connection_keys) { [] }
it 'opens each with a unique connection#id and connection#key' do
connection_count.times.map do
- Ably::Realtime::Client.new(client_options)
+ auto_close Ably::Realtime::Client.new(client_options)
end.each do |client|
client.connection.on(:connected) do
connection_ids << client.connection.id
connection_keys << client.connection.key
next unless connection_ids.count == connection_count
@@ -941,18 +1167,16 @@
end
end
end
context 'when connection enters the :suspended state' do
- let(:client_options) { default_options.merge(:log_level => :fatal) }
-
- before do
- # Reconfigure client library retry periods so that client stays in suspended state
- stub_const 'Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG',
- Ably::Realtime::Connection::ConnectionManager::CONNECT_RETRY_CONFIG.merge(
- disconnected: { retry_every: 0.01, max_time_in_state: 0.05 },
- suspended: { retry_every: 60, max_time_in_state: 60 }
- )
+ let(:client_options) do
+ default_options.merge(
+ log_level: :fatal,
+ disconnected_retry_timeout: 0.02,
+ suspended_retry_timeout: 60,
+ connection_state_ttl: 0.05
+ )
end
it 'detaches the channels and prevents publishing of messages on those channels' do
channel.attach do
channel.once(:detached) do