lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.8.2 vs lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.8.3
- old
+ new
@@ -182,20 +182,27 @@
# @yield [Boolean] True if an internet connection check appears to be up following an HTTP request to a reliable CDN
# @return [EventMachine::Deferrable]
# @api private
def internet_up?
+ url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}"
EventMachine::DefaultDeferrable.new.tap do |deferrable|
- EventMachine::HttpRequest.new("http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}").get.tap do |http|
+ EventMachine::HttpRequest.new(url).get.tap do |http|
http.errback do
yield false if block_given?
- deferrable.fail
+ deferrable.fail "Unable to connect to #{url}"
end
http.callback do
- result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text)
- yield result if block_given?
- deferrable.succeed
+ EventMachine.next_tick do
+ result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text)
+ yield result if block_given?
+ if result
+ deferrable.succeed
+ else
+ deferrable.fail "Unexpected response from #{url} (#{http.response_header.status})"
+ end
+ end
end
end
end
end
@@ -273,11 +280,11 @@
attr_reader :current_host
# @!attribute [r] port
# @return [Integer] The default port used for this connection
def port
- client.use_tls? ? 443 : 80
+ client.use_tls? ? client.custom_tls_port || 443 : client.custom_port || 80
end
# @!attribute [r] logger
# @return [Logger] The {Ably::Logger} for this client.
# Configure the log_level with the `:log_level` option, refer to {Ably::Realtime::Client#initialize}
@@ -309,52 +316,55 @@
# @api private
def notify_message_dispatcher_of_new_message(protocol_message)
__outgoing_protocol_msgbus__.publish :protocol_message, protocol_message
end
+ # @return [EventMachine::Deferrable]
# @api private
def create_websocket_transport
- raise ArgumentError, 'Block required' unless block_given?
+ EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable|
+ # Getting auth params can be blocking so uses a Deferrable
+ client.auth.auth_params.tap do |auth_deferrable|
+ auth_deferrable.callback do |auth_params|
+ url_params = auth_params.merge(
+ timestamp: as_since_epoch(Time.now),
+ format: client.protocol,
+ echo: client.echo_messages
+ )
- blocking_operation = proc do
- URI(client.endpoint).tap do |endpoint|
- url_params = client.auth.auth_params.merge(
- timestamp: as_since_epoch(Time.now),
- format: client.protocol,
- echo: client.echo_messages
- )
-
- if connection_resumable?
- url_params.merge! resume: key, connection_serial: serial
- logger.debug "Resuming connection key #{key} with serial #{serial}"
- elsif connection_recoverable?
- url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial]
- logger.debug "Recovering connection with key #{client.recover}"
- once(:connected, :closed, :failed) do
- client.disable_automatic_connection_recovery
+ if connection_resumable?
+ url_params.merge! resume: key, connection_serial: serial
+ logger.debug "Resuming connection key #{key} with serial #{serial}"
+ elsif connection_recoverable?
+ url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial]
+ logger.debug "Recovering connection with key #{client.recover}"
+ once(:connected, :closed, :failed) do
+ client.disable_automatic_connection_recovery
+ end
end
- end
- endpoint.query = URI.encode_www_form(url_params)
- end.to_s
- end
+ url = URI(client.endpoint).tap do |endpoint|
+ endpoint.query = URI.encode_www_form(url_params)
+ end.to_s
- callback = proc do |url|
- determine_host do |host|
- begin
- logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'"
- @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport|
- yield websocket_transport if block_given?
+ determine_host do |host|
+ begin
+ logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'"
+ @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport|
+ websocket_deferrable.succeed websocket_transport
+ end
+ rescue EventMachine::ConnectionError => error
+ websocket_deferrable.fail error
+ end
end
- rescue EventMachine::ConnectionError => error
- manager.connection_opening_failed error
end
+
+ auth_deferrable.errback do |error|
+ websocket_deferrable.fail error
+ end
end
end
-
- # client.auth.auth_params is a blocking call, so defer this into a thread
- EventMachine.defer blocking_operation, callback
end
# @api private
def release_websocket_transport
@transport = nil
@@ -386,10 +396,17 @@
# @api private
def off_resume(&callback)
resume_callbacks.delete(callback)
end
+ # Returns false if messages cannot be published as a result of message queueing being disabled
+ # @api private
+ def can_publish_messages?
+ connected? ||
+ ( (initialized? || connecting? || disconnected?) && client.queue_messages )
+ end
+
# As we are using a state machine, do not allow change_state to be used
# #transition_state_machine must be used instead
private :change_state
private
@@ -449,11 +466,27 @@
def connection_recover_parts
client.recover.to_s.match(RECOVER_REGEX)
end
+ def production?
+ client.environment.nil? || client.environment == :production
+ end
+
+ def custom_port?
+ if client.use_tls?
+ !!client.custom_tls_port
+ else
+ !!client.custom_port
+ end
+ end
+
+ def custom_host?
+ !!client.custom_realtime_host
+ end
+
def can_use_fallback_hosts?
- if client.environment.nil? && client.custom_realtime_host.nil?
+ if production? && !custom_port? && !custom_host?
if connecting? && previous_state
use_fallback_if_disconnected? || use_fallback_if_suspended?
end
end
end