lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.8.2 vs lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.8.3

- old
+ new

@@ -182,20 +182,27 @@ # @yield [Boolean] True if an internet connection check appears to be up following an HTTP request to a reliable CDN # @return [EventMachine::Deferrable] # @api private def internet_up? + url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}" EventMachine::DefaultDeferrable.new.tap do |deferrable| - EventMachine::HttpRequest.new("http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}").get.tap do |http| + EventMachine::HttpRequest.new(url).get.tap do |http| http.errback do yield false if block_given? - deferrable.fail + deferrable.fail "Unable to connect to #{url}" end http.callback do - result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text) - yield result if block_given? - deferrable.succeed + EventMachine.next_tick do + result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text) + yield result if block_given? + if result + deferrable.succeed + else + deferrable.fail "Unexpected response from #{url} (#{http.response_header.status})" + end + end end end end end @@ -273,11 +280,11 @@ attr_reader :current_host # @!attribute [r] port # @return [Integer] The default port used for this connection def port - client.use_tls? ? 443 : 80 + client.use_tls? ? client.custom_tls_port || 443 : client.custom_port || 80 end # @!attribute [r] logger # @return [Logger] The {Ably::Logger} for this client. # Configure the log_level with the `:log_level` option, refer to {Ably::Realtime::Client#initialize} @@ -309,52 +316,55 @@ # @api private def notify_message_dispatcher_of_new_message(protocol_message) __outgoing_protocol_msgbus__.publish :protocol_message, protocol_message end + # @return [EventMachine::Deferrable] # @api private def create_websocket_transport - raise ArgumentError, 'Block required' unless block_given? + EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable| + # Getting auth params can be blocking so uses a Deferrable + client.auth.auth_params.tap do |auth_deferrable| + auth_deferrable.callback do |auth_params| + url_params = auth_params.merge( + timestamp: as_since_epoch(Time.now), + format: client.protocol, + echo: client.echo_messages + ) - blocking_operation = proc do - URI(client.endpoint).tap do |endpoint| - url_params = client.auth.auth_params.merge( - timestamp: as_since_epoch(Time.now), - format: client.protocol, - echo: client.echo_messages - ) - - if connection_resumable? - url_params.merge! resume: key, connection_serial: serial - logger.debug "Resuming connection key #{key} with serial #{serial}" - elsif connection_recoverable? - url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial] - logger.debug "Recovering connection with key #{client.recover}" - once(:connected, :closed, :failed) do - client.disable_automatic_connection_recovery + if connection_resumable? + url_params.merge! resume: key, connection_serial: serial + logger.debug "Resuming connection key #{key} with serial #{serial}" + elsif connection_recoverable? + url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial] + logger.debug "Recovering connection with key #{client.recover}" + once(:connected, :closed, :failed) do + client.disable_automatic_connection_recovery + end end - end - endpoint.query = URI.encode_www_form(url_params) - end.to_s - end + url = URI(client.endpoint).tap do |endpoint| + endpoint.query = URI.encode_www_form(url_params) + end.to_s - callback = proc do |url| - determine_host do |host| - begin - logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'" - @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport| - yield websocket_transport if block_given? + determine_host do |host| + begin + logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'" + @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport| + websocket_deferrable.succeed websocket_transport + end + rescue EventMachine::ConnectionError => error + websocket_deferrable.fail error + end end - rescue EventMachine::ConnectionError => error - manager.connection_opening_failed error end + + auth_deferrable.errback do |error| + websocket_deferrable.fail error + end end end - - # client.auth.auth_params is a blocking call, so defer this into a thread - EventMachine.defer blocking_operation, callback end # @api private def release_websocket_transport @transport = nil @@ -386,10 +396,17 @@ # @api private def off_resume(&callback) resume_callbacks.delete(callback) end + # Returns false if messages cannot be published as a result of message queueing being disabled + # @api private + def can_publish_messages? + connected? || + ( (initialized? || connecting? || disconnected?) && client.queue_messages ) + end + # As we are using a state machine, do not allow change_state to be used # #transition_state_machine must be used instead private :change_state private @@ -449,11 +466,27 @@ def connection_recover_parts client.recover.to_s.match(RECOVER_REGEX) end + def production? + client.environment.nil? || client.environment == :production + end + + def custom_port? + if client.use_tls? + !!client.custom_tls_port + else + !!client.custom_port + end + end + + def custom_host? + !!client.custom_realtime_host + end + def can_use_fallback_hosts? - if client.environment.nil? && client.custom_realtime_host.nil? + if production? && !custom_port? && !custom_host? if connecting? && previous_state use_fallback_if_disconnected? || use_fallback_if_suspended? end end end