spec/acceptance/realtime/connection_spec.rb in ably-0.8.15 vs spec/acceptance/realtime/connection_spec.rb in ably-1.0.0
- old
+ new
@@ -58,13 +58,14 @@
let(:original_token_expiry_buffer) { @original_token_expiry_buffer }
context 'for renewable tokens' do
context 'that are valid for the duration of the test' do
- context 'with valid pre authorised token expiring in the future' do
+ context 'with valid pre authorized token expiring in the future' do
+ let(:client_options) { default_options.merge(use_token_auth: true) }
it 'uses the existing token created by Auth' do
- client.auth.authorise(ttl: 300)
+ client.auth.authorize(ttl: 300)
expect(client.auth).to_not receive(:request_token)
connection.once(:connected) do
stop_reactor
end
end
@@ -99,12 +100,12 @@
let(:original_renew_token_buffer) { @original_renew_token_buffer }
context 'opening a new connection' do
context 'with almost expired tokens' do
before do
- # Authorise synchronously to ensure token has been issued
- client.auth.authorise_sync(ttl: ttl)
+ # Authorize synchronously to ensure token has been issued
+ client.auth.authorize_sync(ttl: ttl)
end
let(:ttl) { 2 }
it 'renews token every time after it expires' do
@@ -135,11 +136,11 @@
Ably::Rest::Client.new(default_options).auth.request_token(ttl: ttl).token
end
end
let(:client_options) { default_options.merge(auth_callback: token_callback) }
- it 'renews the token on connect, and makes one immediate subsequent attempt to obtain a new token' do
+ it 'renews the token on connect, and makes one immediate subsequent attempt to obtain a new token (#RSA4b)' do
started_at = Time.now.to_f
connection.once(:disconnected) do
connection.once(:disconnected) do |connection_state_change|
expect(connection_state_change.reason.code).to eql(40142) # token expired
expect(Time.now.to_f - started_at).to be < 1000
@@ -148,46 +149,58 @@
end
end
end
context 'when disconnected_retry_timeout is 0.5 seconds' do
- let(:client_options) { default_options.merge(disconnected_retry_timeout: 0.5, auth_callback: token_callback, log_level: :error) }
+ let(:client_options) { default_options.merge(disconnected_retry_timeout: 0.5, auth_callback: token_callback) }
it 'renews the token on connect, and continues to attempt renew based on the retry schedule' do
- started_at = Time.now.to_f
disconnect_count = 0
+ first_disconnected_at = nil
connection.on(:disconnected) do |connection_state_change|
+ first_disconnected_at ||= begin
+ Time.now.to_f
+ end
expect(connection_state_change.reason.code).to eql(40142) # token expired
- disconnect_count += 1
- if disconnect_count == 6
- expect(Time.now.to_f - started_at).to be > 4 * 0.5 # at least 4 0.5 second pauses should have happened
- expect(Time.now.to_f - started_at).to be < 9 # allow 1.5 seconds for each authentication cycle
+ if disconnect_count == 4 # 3 attempts to reconnect after initial
+ # First disconnect reattempts immediately as part of connect sequence
+ # Second disconnect reattempt immediately as part of disconnected retry sequence
+ # Following two take 0.5 second each
+ # Not convinced two immediate retries is necessary, but not worth engineering effort to fix given
+ # it's only one extra retry
+ expect(Time.now.to_f - first_disconnected_at).to be > 2 * 0.5
+ expect(Time.now.to_f - first_disconnected_at).to be < 9 # allow 1.5 seconds for each authentication cycle
stop_reactor
end
+ disconnect_count += 1
end
end
end
context 'using implicit token auth' do
- let(:client_options) { default_options.merge(use_token_auth: true, token_params: { ttl: ttl }) }
+ let(:client_options) { default_options.merge(use_token_auth: true, default_token_params: { ttl: ttl }) }
before do
stub_const 'Ably::Models::TokenDetails::TOKEN_EXPIRY_BUFFER', -10 # ensure client lib thinks token is still valid
end
it 'uses the primary host for subsequent connection and auth requests' do
connection.once(:disconnected) do
expect(client.rest_client.connection).to receive(:post).
with(/requestToken$/, anything).
- exactly(:once).
+ exactly(:twice). # it retries an expired token request immediately
and_call_original
expect(client.rest_client).to_not receive(:fallback_connection)
expect(client).to_not receive(:fallback_endpoint)
+ # Connection will go into :disconnected, then back to
+ # :connecting, then :disconnected again
connection.once(:disconnected) do
- stop_reactor
+ connection.once(:disconnected) do
+ stop_reactor
+ end
end
end
end
end
end
@@ -196,11 +209,11 @@
context 'when connected with a valid non-expired token' do
context 'that then expires following the connection being opened' do
let(:ttl) { 5 }
let(:channel_name) { random_str }
let(:channel) { client.channel(channel_name) }
- let(:client_options) { default_options.merge(use_token_auth: true, token_params: { ttl: ttl }) }
+ let(:client_options) { default_options.merge(use_token_auth: true, default_token_params: { ttl: ttl }) }
context 'the server' do
it 'disconnects the client, and the client automatically renews the token and then reconnects', em_timeout: 15 do
connection.once(:connected) do
original_token = client.auth.current_token_details
@@ -227,10 +240,11 @@
channel.attach
end
end
context 'connection state' do
+ let(:publish_count) { 10 }
let(:ttl) { 4 }
let(:auth_requests) { [] }
let(:token_callback) do
Proc.new do
sleep 2
@@ -241,44 +255,40 @@
let(:client_options) { default_options.merge(auth_callback: token_callback) }
let(:publishing_client) { auto_close Ably::Realtime::Client.new(default_options) }
let(:publishing_channel) { publishing_client.channels.get(channel_name) }
let(:messages_received) { [] }
- def publish_and_check_first_disconnect
- 10.times.each { |index| publishing_channel.publish('event', index.to_s) }
+ def publish_and_check_disconnect(options = {})
+ iteration = options.fetch(:iteration) { 1 }
+ total_expected = publish_count * iteration
+ publish_count.times.each { |index| publishing_channel.publish('event', (total_expected - publish_count + index).to_s) }
channel.subscribe('event') do |message|
messages_received << message.data.to_i
- if messages_received.count == 10
- expect(messages_received).to match(10.times)
- expect(auth_requests.count).to eql(2)
+ if messages_received.count == total_expected
+ expect(messages_received).to match(total_expected.times)
+ expect(auth_requests.count).to eql(iteration + 1)
EventMachine.add_timer(1) do
channel.unsubscribe 'event'
yield
end
end
end
end
- def publish_and_check_second_disconnect
- 10.times.each { |index| publishing_channel.publish('event', (index + 10).to_s) }
- channel.subscribe('event') do |message|
- messages_received << message.data.to_i
- if messages_received.count == 20
- expect(messages_received).to match(20.times)
- expect(auth_requests.count).to eql(3)
- stop_reactor
- end
- end
- end
-
- it 'retains messages published when disconnected twice during authentication', em_timeout: 20 do
+ it 'retains messages published when disconnected three times during authentication', em_timeout: 30 do
publishing_channel.attach do
channel.attach do
connection.once(:disconnected) do
- publish_and_check_first_disconnect do
+ publish_and_check_disconnect(iteration: 1) do
connection.once(:disconnected) do
- publish_and_check_second_disconnect
+ publish_and_check_disconnect(iteration: 2) do
+ connection.once(:disconnected) do
+ publish_and_check_disconnect(iteration: 3) do
+ stop_reactor
+ end
+ end
+ end
end
end
end
end
end
@@ -320,17 +330,18 @@
end
let!(:expired_token_details) do
# Request a token synchronously
token_client = auto_close Ably::Realtime::Client.new(default_options)
- token_client.auth.request_token_sync(ttl: 0.01)
+ token_client.auth.request_token_sync(ttl: ttl)
end
context 'opening a new connection' do
let(:client_options) { default_options.merge(key: nil, token: expired_token_details.token, log_level: :none) }
+ let(:ttl) { 0.01 }
- it 'transitions state to failed', em_timeout: 10 do
+ it 'transitions state to failed (#RSA4a)', em_timeout: 10 do
EventMachine.add_timer(1) do # wait for token to expire
expect(expired_token_details).to be_expired
connection.once(:connected) { raise 'Connection should never connect as token has expired' }
connection.once(:failed) do
expect(client.connection.error_reason.code).to eql(40142)
@@ -339,11 +350,21 @@
end
end
end
context 'when connected' do
- skip 'transitions state to failed'
+ let(:client_options) { default_options.merge(key: nil, token: expired_token_details.token, log_level: :none) }
+ let(:ttl) { 4 }
+
+ it 'transitions state to failed (#RSA4a)' do
+ connection.once(:connected) do
+ connection.once(:failed) do
+ expect(client.connection.error_reason.code).to eql(40142)
+ stop_reactor
+ end
+ end
+ end
end
end
end
context 'with opaque token string that contain an implicit client_id' do
@@ -577,15 +598,17 @@
end
end
end
context 'when closing' do
- it 'raises an exception before the connection is closed' do
+ it 'fails the deferrable before the connection is closed' do
connection.connect do
connection.once(:closing) do
- expect { connection.connect }.to raise_error Ably::Exceptions::InvalidStateChange
- stop_reactor
+ connection.connect.errback do |error|
+ expect(error).to be_a(Ably::Exceptions::InvalidStateChange)
+ stop_reactor
+ end
end
connection.close
end
end
end
@@ -603,14 +626,16 @@
context 'when a message is sent but the ACK has not yet been received' do
it 'the sent message msgSerial is 0 but the connection serial remains at -1' do
channel.attach do
connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
- connection.__outgoing_protocol_msgbus__.unsubscribe
- expect(protocol_message['msgSerial']).to eql(0)
- expect(connection.serial).to eql(-1)
- stop_reactor
+ if protocol_message.action == :message
+ connection.__outgoing_protocol_msgbus__.unsubscribe
+ expect(protocol_message['msgSerial']).to eql(0)
+ expect(connection.serial).to eql(-1)
+ stop_reactor
+ end
end
channel.publish('event', 'data')
end
end
end
@@ -676,11 +701,10 @@
context 'when connection state is' do
let(:events) { Hash.new }
def log_connection_changes
connection.on(:closing) { events[:closing_emitted] = true }
- connection.on(:error) { events[:error_emitted] = true }
connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
events[:closed_message_from_server_received] = true if protocol_message.action == :closed
end
end
@@ -689,11 +713,10 @@
it 'changes the connection state to :closing and then immediately :closed without sending a ProtocolMessage CLOSE' do
connection.on(:closed) do
expect(connection.state).to eq(:closed)
EventMachine.add_timer(1) do # allow for all subscribers on incoming message bes
- expect(events[:error_emitted]).to_not eql(true)
expect(events[:closed_message_from_server_received]).to_not eql(true)
expect(events[:closing_emitted]).to eql(true)
stop_reactor
end
end
@@ -706,11 +729,10 @@
context ':connected' do
it 'changes the connection state to :closing and waits for the server to confirm connection is :closed with a ProtocolMessage' do
connection.on(:connected) do
connection.on(:closed) do
EventMachine.add_timer(1) do # allow for all subscribers on incoming message bus
- expect(events[:error_emitted]).to_not eql(true)
expect(events[:closed_message_from_server_received]).to eql(true)
expect(events[:closing_emitted]).to eql(true)
stop_reactor
end
end
@@ -737,63 +759,162 @@
close_requested_at = Time.now
connection.on(:closed) do
expect(Time.now - close_requested_at).to be >= custom_timeout
expect(connection.state).to eq(:closed)
- expect(events[:error_emitted]).to_not eql(true)
expect(events[:closed_message_from_server_received]).to_not eql(true)
expect(events[:closing_emitted]).to eql(true)
stop_reactor
end
log_connection_changes
- connection.close
+ EventMachine.next_tick { connection.close }
end
end
end
end
end
end
context '#ping' do
- it 'echoes a heart beat' do
+ it 'echoes a heart beat (#RTN13a)' do
connection.on(:connected) do
connection.ping do |time_elapsed|
expect(time_elapsed).to be > 0
+ expect(time_elapsed).to be < 3
stop_reactor
end
end
end
- context 'when not connected' do
- it 'raises an exception' do
- expect { connection.ping }.to raise_error RuntimeError, /Cannot send a ping when connection/
- stop_reactor
+ it 'sends a unique ID in each protocol message (#RTN13e)' do
+ connection.on(:connected) do
+ heartbeat_ids = []
+ pings_complete = []
+ connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :heartbeat
+ heartbeat_ids << protocol_message.id
+ end
+ end
+
+ ping_block = Proc.new do
+ pings_complete << true
+ if pings_complete.length == 3
+ expect(heartbeat_ids.uniq.length).to eql(3)
+ stop_reactor
+ end
+ end
+
+ connection.ping(&ping_block)
+ connection.ping(&ping_block)
+ connection.ping(&ping_block)
end
end
+ it 'waits until the connection becomes CONNECTED when in the CONNETING state' do
+ connection.once(:connecting) do
+ connection.ping do |time_elapsed|
+ expect(connection.state).to eq(:connected)
+ stop_reactor
+ end
+ end
+ end
+
+ context 'with incompatible states' do
+ let(:client_options) { default_options.merge(log_level: :none) }
+
+ context 'when not connected' do
+ it 'fails the deferrable (#RTN13b)' do
+ connection.ping.errback do |error|
+ expect(error.message).to match(/Cannot send a ping when.*initialized/i)
+ stop_reactor
+ end
+ end
+ end
+
+ context 'when suspended' do
+ it 'fails the deferrable (#RTN13b)' do
+ connection.once(:connected) do
+ connection.transition_state_machine! :suspended
+ connection.ping.errback do |error|
+ expect(error.message).to match(/Cannot send a ping when.*suspended/i)
+ stop_reactor
+ end
+ end
+ end
+ end
+
+ context 'when failed' do
+ it 'fails the deferrable (#RTN13b)' do
+ connection.once(:connected) do
+ connection.transition_state_machine! :failed
+ connection.ping.errback do |error|
+ expect(error.message).to match(/Cannot send a ping when.*failed/i)
+ stop_reactor
+ end
+ end
+ end
+ end
+
+ context 'when closed' do
+ it 'fails the deferrable (#RTN13b)' do
+ connection.once(:connected) do
+ connection.close
+ connection.once(:closed) do
+ connection.ping.errback do |error|
+ expect(error.message).to match(/Cannot send a ping when.*closed/i)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+
+ context 'when it becomes closed' do
+ it 'fails the deferrable (#RTN13b)' do
+ connection.once(:connected) do
+ connection.ping.errback do |error|
+ expect(error.message).to match(/Ping failed as connection has changed state to.*closing/i)
+ stop_reactor
+ end
+ connection.close
+ end
+ end
+ end
+ end
+
context 'with a success block that raises an exception' do
it 'catches the exception and logs the error' do
connection.on(:connected) do
- expect(connection.logger).to receive(:error).with(/Forced exception/) do
+ expect(connection.logger).to receive(:error) do |*args, &block|
+ expect(args.concat([block ? block.call : nil]).join(',')).to match(/Forced exception/)
stop_reactor
end
connection.ping { raise 'Forced exception' }
end
end
end
context 'when ping times out' do
let(:client_options) { default_options.merge(log_level: :error) }
- it 'logs a warning' do
+ it 'fails the deferrable logs a warning (#RTN13a, #RTN13c)' do
+ message_logged = false
connection.once(:connected) do
allow(connection).to receive(:defaults).and_return(connection.defaults.merge(realtime_request_timeout: 0.0001))
- expect(connection.logger).to receive(:warn).with(/Ping timed out/) do
- stop_reactor
+ expect(connection.logger).to receive(:warn) do |*args, &block|
+ expect(block.call).to match(/Ping timed out/)
+ message_logged = true
end
- connection.ping
+ connection.ping.errback do |error|
+ EventMachine.add_timer(0.1) do
+ expect(error.message).to match(/Ping timed out/)
+ expect(error.code).to eql(50003)
+ expect(message_logged).to be_truthy
+ stop_reactor
+ end
+ end
end
end
it 'yields to the block with a nil value' do
connection.once(:connected) do
@@ -805,30 +926,131 @@
end
end
end
end
+ context 'Heartbeats (#RTN23)' do
+ context 'heartbeat interval' do
+ context 'when reduced artificially' do
+ let(:protocol_message_attributes) do
+ {
+ action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i,
+ connection_serial: 55,
+ connection_details: {
+ max_idle_interval: 2 * 1000
+ }
+ }
+ end
+ let(:client_options) { default_options.merge(realtime_request_timeout: 3) }
+ let(:expected_heartbeat_interval) { 5 }
+
+ it 'is the sum of the max_idle_interval and realtime_request_timeout (#RTN23a)' do
+ connection.once(:connected) do
+ connection.__incoming_protocol_msgbus__.publish :protocol_message, Ably::Models::ProtocolMessage.new(protocol_message_attributes)
+ EventMachine.next_tick do
+ expect(connection.heartbeat_interval).to eql(expected_heartbeat_interval)
+ stop_reactor
+ end
+ end
+ end
+
+ it 'disconnects the transport if no heartbeat received since connected (#RTN23a)' do
+ connection.once(:connected) do
+ connection.__incoming_protocol_msgbus__.publish :protocol_message, Ably::Models::ProtocolMessage.new(protocol_message_attributes)
+ last_received_at = Time.now
+ connection.once(:disconnected) do
+ expect(Time.now.to_i - last_received_at.to_i).to be_within(2).of(expected_heartbeat_interval)
+ stop_reactor
+ end
+ end
+ end
+
+ it 'disconnects the transport if no heartbeat received since last event received (#RTN23a)' do
+ connection.once(:connected) do
+ connection.__incoming_protocol_msgbus__.publish :protocol_message, Ably::Models::ProtocolMessage.new(protocol_message_attributes)
+ last_received_at = Time.now
+ EventMachine.add_timer(3) { client.channels.get('foo').attach }
+ connection.once(:disconnected) do
+ expect(Time.now.to_i - last_received_at.to_i).to be_within(2).of(expected_heartbeat_interval + 3)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+
+ context 'transport-level heartbeats are supported in the websocket transport' do
+ it 'provides the heartbeats argument in the websocket connection params (#RTN23b)' do
+ expect(EventMachine).to receive(:connect) do |host, port, transport, object, url|
+ uri = URI.parse(url)
+ expect(CGI::parse(uri.query)['heartbeats'][0]).to eql('false')
+ stop_reactor
+ end
+ client
+ end
+
+ it 'receives websocket heartbeat messages (#RTN23b) [slow test as need to wait for heartbeat]', em_timeout: 45 do
+ skip "Heartbeats param is missing from realtime implementation, see https://github.com/ably/realtime/issues/656"
+
+ connection.once(:connected) do
+ connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :heartbeat
+ expect(protocol_message.attributes[:source]).to eql('websocket')
+ expect(connection.time_since_connection_confirmed_alive?).to be_within(1).of(0)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+
+ context 'with websocket heartbeats disabled (undocumented)' do
+ let(:client_options) { default_options.merge(websocket_heartbeats_disabled: true) }
+
+ it 'does not provide the heartbeats argument in the websocket connection params (#RTN23b)' do
+ expect(EventMachine).to receive(:connect) do |host, port, transport, object, url|
+ uri = URI.parse(url)
+ expect(CGI::parse(uri.query)['heartbeats'][0]).to be_nil
+ stop_reactor
+ end
+ client
+ end
+
+ it 'receives websocket protocol messages (#RTN23b) [slow test as need to wait for heartbeat]', em_timeout: 45 do
+ connection.once(:connected) do
+ connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ if protocol_message.action == :heartbeat
+ expect(protocol_message.attributes[:source]).to_not eql('websocket')
+ expect(connection.time_since_connection_confirmed_alive?).to be_within(1).of(0)
+ stop_reactor
+ end
+ end
+ end
+ end
+ end
+ end
+
context '#details' do
let(:connection) { client.connection }
it 'is nil before connected' do
connection.on(:connecting) do
expect(connection.details).to eql(nil)
stop_reactor
end
end
- it 'contains the ConnectionDetails object once connected' do
+ it 'contains the ConnectionDetails object once connected (#RTN21)' do
connection.on(:connected) do
expect(connection.details).to be_a(Ably::Models::ConnectionDetails)
expect(connection.details.connection_key).to_not be_nil
expect(connection.details.server_id).to_not be_nil
stop_reactor
end
end
- it 'contains the new ConnectionDetails object once a subsequent connection is created' do
+ it 'contains the new ConnectionDetails object once a subsequent connection is created (#RTN21)' do
connection.once(:connected) do
expect(connection.details.connection_key).to_not be_nil
old_key = connection.details.connection_key
connection.close do
connection.once(:connected) do
@@ -839,17 +1061,17 @@
connection.connect
end
end
end
- context 'with a different connection_state_ttl' do
+ context 'with a different default connection_state_ttl' do
before do
old_defaults = Ably::Realtime::Connection::DEFAULTS
stub_const 'Ably::Realtime::Connection::DEFAULTS', old_defaults.merge(connection_state_ttl: 15)
end
- it 'updates the private Connection#connection_state_ttl' do
+ it 'updates the private Connection#connection_state_ttl when received from Ably in ConnectionDetails' do
expect(connection.connection_state_ttl).to eql(15)
connection.once(:connected) do
expect(connection.connection_state_ttl).to be > 15
stop_reactor
@@ -952,11 +1174,11 @@
end
context "opening a new connection using a recently disconnected connection's #recovery_key" do
context 'connection#id and connection#key after recovery' do
it 'remains the same for id and party for key' do
- connection_key_consistent_part_regex = /.*?!(\w{5,})-/
+ connection_key_consistent_part_regex = /.*?!([\w-]{5,})-\w+/
previous_connection_id = nil
previous_connection_key = nil
connection.once(:connected) do
previous_connection_id = connection.id
@@ -1034,32 +1256,32 @@
end
context 'with invalid formatted value sent to server' do
let(:client_options) { default_options.merge(recover: 'not-a-valid-connection-key:1', log_level: :none) }
- it 'emits a fatal error on the connection object, sets the #error_reason and disconnects' do
- connection.once(:error) do |error|
+ it 'sets the #error_reason and moves the connection to FAILED' do
+ connection.once(:failed) do |state_change|
expect(connection.state).to eq(:failed)
- expect(error.message).to match(/Invalid connectionKey/i)
+ expect(state_change.reason.message).to match(/Invalid connectionKey/i)
expect(connection.error_reason.message).to match(/Invalid connectionKey/i)
expect(connection.error_reason.code).to eql(80018)
- expect(connection.error_reason).to eql(error)
+ expect(connection.error_reason).to eql(state_change.reason)
stop_reactor
end
end
end
context 'with expired (missing) value sent to server' do
let(:client_options) { default_options.merge(recover: 'wVIsgTHAB1UvXh7z-1991d8586:0', log_level: :fatal) }
- it 'emits an error on the connection object, sets the #error_reason, yet will connect anyway' do
- connection.once(:error) do |error|
+ it 'connects but sets the error reason and includes the reason in the state change' do
+ connection.once(:connected) do |state_change|
expect(connection.state).to eq(:connected)
- expect(error.message).to match(/Unable to recover connection/i)
+ expect(state_change.reason.message).to match(/Unable to recover connection/i)
expect(connection.error_reason.message).to match(/Unable to recover connection/i)
expect(connection.error_reason.code).to eql(80008)
- expect(connection.error_reason).to eql(error)
+ expect(connection.error_reason).to eql(state_change.reason)
stop_reactor
end
end
end
end
@@ -1086,36 +1308,38 @@
end
end
end
context 'when a state transition is unsupported' do
- let(:client_options) { default_options.merge(log_level: :none) } # silence FATAL errors
+ let(:client_options) { default_options.merge(log_level: :fatal) } # silence FATAL errors
- it 'emits a InvalidStateChange' do
+ it 'logs the invalid state change as fatal' do
connection.connect do
connection.transition_state_machine :initialized
+ EventMachine.add_timer(1) { stop_reactor }
end
- connection.on(:error) do |error|
- expect(error).to be_a(Ably::Exceptions::InvalidStateChange)
- stop_reactor
+ expect(client.logger).to receive(:fatal).at_least(:once) do |*args, &block|
+ expect(args.concat([block ? block.call : nil]).join(',')).to match(/Unable to transition/)
end
end
end
context 'protocol failure' do
- let(:client_options) { default_options.merge(protocol: :json) }
+ let(:client_options) { default_options.merge(protocol: :json, log_level: :none) }
context 'receiving an invalid ProtocolMessage' do
it 'emits an error on the connection and logs a fatal error message' do
connection.connect do
connection.transport.send(:driver).emit 'message', OpenStruct.new(data: { action: 500 }.to_json)
end
- expect(client.logger).to receive(:fatal).with(/Invalid Protocol Message/)
- connection.on(:error) do |error|
- expect(error.message).to match(/Invalid Protocol Message/)
+ expect(client.logger).to receive(:fatal).at_least(:once) do |*args, &block|
+ expect(args.concat([block ? block.call : nil]).join(',')).to match(/Invalid Protocol Message/)
+ end
+ connection.on(:failed) do |state_change|
+ expect(state_change.reason.message).to match(/Invalid Protocol Message/)
stop_reactor
end
end
end
end
@@ -1248,15 +1472,17 @@
suspended_retry_timeout: 60,
max_connection_state_ttl: 0.05
)
end
- it 'detaches the channels and prevents publishing of messages on those channels' do
+ it 'moves the channels into the suspended state and prevents publishing of messages on those channels' do
channel.attach do
- channel.once(:detached) do
- expect { channel.publish 'test' }.to raise_error(Ably::Exceptions::ChannelInactive)
- stop_reactor
+ channel.once(:suspended) do
+ channel.publish('test').errback do |error|
+ expect(error).to be_a(Ably::Exceptions::MessageQueueingDisabled)
+ stop_reactor
+ end
end
close_connection_proc = Proc.new do
EventMachine.add_timer(0.001) do
if connection.transport.nil?
@@ -1280,12 +1506,14 @@
let(:client_options) { default_options.merge(:key => 'will.not:authenticate', log_level: :none) }
it 'sets all channels to failed and prevents publishing of messages on those channels' do
channel.attach
channel.once(:failed) do
- expect { channel.publish 'test' }.to raise_error(Ably::Exceptions::ChannelInactive)
- stop_reactor
+ channel.publish('test').errback do |error|
+ expect(error).to be_a(Ably::Exceptions::ChannelInactive)
+ stop_reactor
+ end
end
end
end
end
@@ -1312,32 +1540,34 @@
stop_reactor
end
end
context 'ConnectionStateChange object' do
- def unbind
- if connection.transport
- connection.transport.unbind
- else
- EventMachine.add_timer(0.005) { unbind }
- end
- end
-
it 'has current state' do
connection.on(:connected) do |connection_state_change|
+ expect(connection_state_change.current).to be_a(Ably::Realtime::Connection::STATE)
expect(connection_state_change.current).to eq(:connected)
stop_reactor
end
end
it 'has a previous state' do
connection.on(:connected) do |connection_state_change|
+ expect(connection_state_change.previous).to be_a(Ably::Realtime::Connection::STATE)
expect(connection_state_change.previous).to eq(:connecting)
stop_reactor
end
end
+ it 'has the event that generated the state change (#TH5)' do
+ connection.on(:connected) do |connection_state_change|
+ expect(connection_state_change.event).to be_a(Ably::Realtime::Connection::EVENT)
+ expect(connection_state_change.event).to eq(:connected)
+ stop_reactor
+ end
+ end
+
it 'contains a private API protocol_message attribute that is used for special state change events', :api_private do
connection.on(:connected) do |connection_state_change|
expect(connection_state_change.protocol_message).to be_a(Ably::Models::ProtocolMessage)
expect(connection_state_change.reason).to be_nil
stop_reactor
@@ -1383,11 +1613,11 @@
connection.once(:connecting) do
connection.once(:disconnected) do |connection_state_change|
expect(connection_state_change.retry_in).to eql(0)
stop_reactor
end
- unbind
+ EventMachine.add_timer(0.005) { connection.transport.unbind }
end
end
it 'is 0 when an immediate reconnect will occur' do
connection.once(:connected) do
@@ -1404,52 +1634,141 @@
connection.once(:connecting) do
connection.once(:disconnected) do |connection_state_change|
expect(connection_state_change.retry_in).to be > 0
stop_reactor
end
- unbind
+ EventMachine.add_timer(0.005) { connection.transport.unbind }
end
connection.transport.unbind
end
end
end
end
+
+ context 'whilst CONNECTED' do
+ context 'when a CONNECTED message is received (#RTN24)' do
+ let(:connection_key) { random_str(32) }
+ let(:protocol_message_attributes) do
+ {
+ action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i,
+ connection_serial: 55,
+ connection_details: {
+ client_id: 'bob',
+ connection_key: connection_key,
+ connection_state_ttl: 33 * 1000,
+ max_frame_size: 555,
+ max_inbound_rate: 999,
+ max_message_size: 1310,
+ server_id: 'us-east-1-a.foo.com',
+ max_idle_interval: 4 * 1000
+ }
+ }
+ end
+
+ it 'emits an UPDATE event' do
+ connection.once(:connected) do
+ connection.once(:update) do |connection_state_change|
+ expect(connection_state_change.current).to eq(:connected)
+ expect(connection_state_change.previous).to eq(:connected)
+ expect(connection_state_change.retry_in).to be_nil
+ expect(connection_state_change.reason).to be_nil
+ expect(connection.state).to eq(:connected)
+ stop_reactor
+ end
+
+ connection.__incoming_protocol_msgbus__.publish :protocol_message, Ably::Models::ProtocolMessage.new(protocol_message_attributes)
+ end
+ end
+
+ it 'updates the ConnectionDetail and Connection attributes (#RTC8a1)' do
+ connection.once(:connected) do
+ expect(client.auth.client_id).to eql('*')
+
+ connection.once(:update) do |connection_state_change|
+ expect(client.auth.client_id).to eql('bob')
+ expect(connection.key).to eql(connection_key)
+ expect(connection.serial).to eql(55)
+ expect(connection.connection_state_ttl).to eql(33)
+
+ expect(connection.details.client_id).to eql('bob')
+ expect(connection.details.connection_key).to eql(connection_key)
+ expect(connection.details.connection_state_ttl).to eql(33)
+ expect(connection.details.max_frame_size).to eql(555)
+ expect(connection.details.max_inbound_rate).to eql(999)
+ expect(connection.details.max_message_size).to eql(1310)
+ expect(connection.details.server_id).to eql('us-east-1-a.foo.com')
+ expect(connection.details.max_idle_interval).to eql(4)
+ stop_reactor
+ end
+
+ connection.__incoming_protocol_msgbus__.publish :protocol_message, Ably::Models::ProtocolMessage.new(protocol_message_attributes)
+ end
+ end
+ end
+
+ context 'when a CONNECTED message with an error is received' do
+ let(:protocol_message_attributes) do
+ {
+ action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i,
+ connection_serial: 22,
+ error: { code: 50000, message: 'Internal failure' },
+ }
+ end
+
+ it 'emits an UPDATE event' do
+ connection.once(:connected) do
+ connection.on(:update) do |connection_state_change|
+ expect(connection_state_change.current).to eq(:connected)
+ expect(connection_state_change.previous).to eq(:connected)
+ expect(connection_state_change.retry_in).to be_nil
+ expect(connection_state_change.reason).to be_a(Ably::Models::ErrorInfo)
+ expect(connection_state_change.reason.code).to eql(50000)
+ expect(connection_state_change.reason.message).to match(/Internal failure/)
+ expect(connection.state).to eq(:connected)
+ stop_reactor
+ end
+
+ connection.__incoming_protocol_msgbus__.publish :protocol_message, Ably::Models::ProtocolMessage.new(protocol_message_attributes)
+ end
+ end
+ end
+ end
end
context 'version params' do
- it 'sends the protocol version param v' do
+ it 'sends the protocol version param v (#G4, #RTN2f)' do
expect(EventMachine).to receive(:connect) do |host, port, transport, object, url|
uri = URI.parse(url)
- expect(CGI::parse(uri.query)['v'][0]).to eql(Ably::PROTOCOL_VERSION)
+ expect(CGI::parse(uri.query)['v'][0]).to eql('1.0')
stop_reactor
end
client
end
- it 'sends the lib version param lib' do
+ it 'sends the lib version param lib (#RTN2g)' do
expect(EventMachine).to receive(:connect) do |host, port, transport, object, url|
uri = URI.parse(url)
- expect(CGI::parse(uri.query)['lib'][0]).to eql("ruby-#{Ably::VERSION}")
+ expect(CGI::parse(uri.query)['lib'][0]).to match(/^ruby-1\.0\.\d+(-[\w\.]+)?+$/)
stop_reactor
end
client
end
context 'with variant' do
- let(:variant) { 'foo ' }
+ let(:variant) { 'foo' }
before do
Ably.lib_variant = variant
end
after do
Ably.lib_variant = nil
end
- it 'sends the lib version param lib with the variant' do
+ it 'sends the lib version param lib with the variant (#RTN2g + #RSC7b)' do
expect(EventMachine).to receive(:connect) do |host, port, transport, object, url|
uri = URI.parse(url)
- expect(CGI::parse(uri.query)['lib'][0]).to eql("ruby-#{variant}-#{Ably::VERSION}")
+ expect(CGI::parse(uri.query)['lib'][0]).to match(/^ruby-#{variant}-1\.0\.\d+(-[\w\.]+)?$/)
stop_reactor
end
client
end
end