lib/pubnub.rb in pubnub-3.3.0.2 vs lib/pubnub.rb in pubnub-3.3.0.5

- old
+ new

@@ -26,10 +26,21 @@ require 'active_support/core_ext/object/blank' class Pubnub + SUCCESS_RESPONSE = 200 + + TIMEOUT_BAD_RESPONSE_CODE = 1 + TIMEOUT_BAD_JSON_RESPONSE = 0.5 + TIMEOUT_GENERAL_ERROR = 1 + TIMEOUT_SUBSCRIBE = 310 + TIMEOUT_NON_SUBSCRIBE = 5 + + PUBNUB_LOGGER = Logger.new("/tmp/pubnubError.log") + PUBNUB_LOGGER.level = Logger::DEBUG + class PresenceError < RuntimeError; end class PublishError < RuntimeError; end class SubscribeError < RuntimeError; @@ -37,14 +48,11 @@ class InitError < RuntimeError; end attr_accessor :publish_key, :subscribe_key, :secret_key, :cipher_key, :ssl, :channel, :origin, :session_uuid - - #ORIGINS = %w(newcloud-virginia.pubnub.com newcloud-california.pubnub.com newcloud-ireland.pubnub.com newcloud-tokyo.pubnub.com) ORIGIN_HOST = 'pubsub.pubnub.com' - #ORIGIN_HOST = 'newcloud-california.pubnub.com' #ORIGIN_HOST = 'test.pubnub.com' def initialize(*args) if args.size == 5 # passing in named parameters @@ -83,10 +91,11 @@ publish_request = PubnubRequest.new(:operation => :publish) #TODO: refactor into initializer code on request instantiation publish_request.ssl = @ssl + publish_request.set_origin(options) publish_request.set_channel(options) publish_request.set_callback(options) publish_request.set_cipher_key(options, self.cipher_key) publish_request.set_message(options, self.cipher_key) publish_request.set_publish_key(options, self.publish_key) @@ -107,10 +116,11 @@ subscribe_request = PubnubRequest.new(:operation => operation, :session_uuid => @session_uuid) #TODO: refactor into initializer code on request instantiation subscribe_request.ssl = @ssl + subscribe_request.set_origin(options) subscribe_request.set_channel(options) subscribe_request.set_callback(options) subscribe_request.set_cipher_key(options, self.cipher_key) unless subscribe_request.operation == "presence" subscribe_request.set_subscribe_key(options, self.subscribe_key) @@ -265,55 +275,29 @@ end end end private - + def _request(request, is_reactor_running = false) request.format_url! - #puts("- Fetching #{request.url}") Thread.new{ begin - conn = EM::HttpRequest.new(request.url, :inactivity_timeout => 310)#client times out in 310s unless the server returns or timeout first + operation_timeout = %w(subscribe presence).include?(request.operation) ? TIMEOUT_SUBSCRIBE : TIMEOUT_NON_SUBSCRIBE + conn = EM::HttpRequest.new(request.url, :inactivity_timeout => operation_timeout) #client times out in 310s unless the server returns or timeout first req = conn.get() req.errback{ - if req.response.blank? - puts("#{Time.now}: Reconnecting from timeout.") - - EM::Timer.new(1) do - _request(request, is_reactor_running) - end - else - error_message = "Unknown Error: #{req.response.to_s}" - puts(error_message) - request.callback.call([0, error_message]) - - EM.stop unless is_reactor_running - end + logAndRetryGeneralError(is_reactor_running, req, request) } req.callback { - request.package_response!(req.response) - cycle = request.callback.call(request.response) - - only_success_status_is_acceptable = 200 - - if (req.response_header.http_status.to_i != only_success_status_is_acceptable) - - error_message = "Server Error, status: #{req.response_header.http_status}, extended info: #{req.response}" - puts(error_message) - EM.stop unless is_reactor_running + if checkForBadJSON(req) == true + logAndRetryBadJSON(is_reactor_running, req, request) else - - if %w(subscribe presence).include?(request.operation) && (cycle != false || request.first_request?) - _request(request, is_reactor_running) - else - EM.stop unless is_reactor_running - end - + processGoodResponse(is_reactor_running, req, request) end } rescue EventMachine::ConnectionError, RuntimeError => e # RuntimeError for catching "EventMachine not initialized" error_message = "Network Error: #{e.message}" @@ -321,8 +305,74 @@ return [0, error_message] end } end + def checkForBadJSON(req) + jsonError = false + begin + JSON.parse(req.response) + rescue => e + jsonError = true + end + jsonError + end + def processGoodResponse(is_reactor_running, req, request) -end + if (req.response_header.http_status.to_i != SUCCESS_RESPONSE) + logAndRetryBadResponseCode(is_reactor_running, req, request) + else + + request.package_response!(req.response) + cycle = request.callback.call(request.response) + + if %w(subscribe presence).include?(request.operation) && (cycle != false || request.first_request?) + _request(request, is_reactor_running) + else + EM.stop unless is_reactor_running + end + end + end + + def logAndRetryGeneralError(is_reactor_running, req, request) + errMsg = "#{Time.now}: Network connectivity issue while attempting to reach #{request.url}" + logError(errMsg) + retryRequest(is_reactor_running, req, request, TIMEOUT_GENERAL_ERROR) + end + + def logAndRetryBadJSON(is_reactor_running, req, request) + errMsg = "#{Time.now}: Retrying from bad JSON: #{req.response.to_s}" + logError(errMsg) + retryRequest(is_reactor_running, req, request, TIMEOUT_BAD_JSON_RESPONSE) + end + + def logAndRetryBadResponseCode(is_reactor_running, req, request) + errMsg = "#{Time.now}: Retrying from bad server response code: (#{req.response_header.http_status.to_i}) #{req.response.to_s}" + logError(errMsg) + retryRequest(is_reactor_running, req, request, TIMEOUT_BAD_RESPONSE_CODE) + end + + def logError(errMsg) + PUBNUB_LOGGER.debug(errMsg) + end + + def retryRequest(is_reactor_running, req, request, delay) + + if %w(subscribe presence).include?(request.operation) + EM::Timer.new(delay) do + _request(request, is_reactor_running) + end + else + error_msg = [0, "Request to #{request.url} failed."] + + request.set_error(true) + request.callback.call(error_msg) + + PUBNUB_LOGGER.debug(error_msg) + + EM.stop unless is_reactor_running + end + + end + +end \ No newline at end of file