lib/pubnub.rb in pubnub-3.3.0.2 vs lib/pubnub.rb in pubnub-3.3.0.5
- old
+ new
@@ -26,10 +26,21 @@
require 'active_support/core_ext/object/blank'
class Pubnub
+ SUCCESS_RESPONSE = 200
+
+ TIMEOUT_BAD_RESPONSE_CODE = 1
+ TIMEOUT_BAD_JSON_RESPONSE = 0.5
+ TIMEOUT_GENERAL_ERROR = 1
+ TIMEOUT_SUBSCRIBE = 310
+ TIMEOUT_NON_SUBSCRIBE = 5
+
+ PUBNUB_LOGGER = Logger.new("/tmp/pubnubError.log")
+ PUBNUB_LOGGER.level = Logger::DEBUG
+
class PresenceError < RuntimeError;
end
class PublishError < RuntimeError;
end
class SubscribeError < RuntimeError;
@@ -37,14 +48,11 @@
class InitError < RuntimeError;
end
attr_accessor :publish_key, :subscribe_key, :secret_key, :cipher_key, :ssl, :channel, :origin, :session_uuid
-
- #ORIGINS = %w(newcloud-virginia.pubnub.com newcloud-california.pubnub.com newcloud-ireland.pubnub.com newcloud-tokyo.pubnub.com)
ORIGIN_HOST = 'pubsub.pubnub.com'
- #ORIGIN_HOST = 'newcloud-california.pubnub.com'
#ORIGIN_HOST = 'test.pubnub.com'
def initialize(*args)
if args.size == 5 # passing in named parameters
@@ -83,10 +91,11 @@
publish_request = PubnubRequest.new(:operation => :publish)
#TODO: refactor into initializer code on request instantiation
publish_request.ssl = @ssl
+ publish_request.set_origin(options)
publish_request.set_channel(options)
publish_request.set_callback(options)
publish_request.set_cipher_key(options, self.cipher_key)
publish_request.set_message(options, self.cipher_key)
publish_request.set_publish_key(options, self.publish_key)
@@ -107,10 +116,11 @@
subscribe_request = PubnubRequest.new(:operation => operation, :session_uuid => @session_uuid)
#TODO: refactor into initializer code on request instantiation
subscribe_request.ssl = @ssl
+ subscribe_request.set_origin(options)
subscribe_request.set_channel(options)
subscribe_request.set_callback(options)
subscribe_request.set_cipher_key(options, self.cipher_key) unless subscribe_request.operation == "presence"
subscribe_request.set_subscribe_key(options, self.subscribe_key)
@@ -265,55 +275,29 @@
end
end
end
private
-
+
def _request(request, is_reactor_running = false)
request.format_url!
- #puts("- Fetching #{request.url}")
Thread.new{
begin
- conn = EM::HttpRequest.new(request.url, :inactivity_timeout => 310)#client times out in 310s unless the server returns or timeout first
+ operation_timeout = %w(subscribe presence).include?(request.operation) ? TIMEOUT_SUBSCRIBE : TIMEOUT_NON_SUBSCRIBE
+ conn = EM::HttpRequest.new(request.url, :inactivity_timeout => operation_timeout) #client times out in 310s unless the server returns or timeout first
req = conn.get()
req.errback{
- if req.response.blank?
- puts("#{Time.now}: Reconnecting from timeout.")
-
- EM::Timer.new(1) do
- _request(request, is_reactor_running)
- end
- else
- error_message = "Unknown Error: #{req.response.to_s}"
- puts(error_message)
- request.callback.call([0, error_message])
-
- EM.stop unless is_reactor_running
- end
+ logAndRetryGeneralError(is_reactor_running, req, request)
}
req.callback {
- request.package_response!(req.response)
- cycle = request.callback.call(request.response)
-
- only_success_status_is_acceptable = 200
-
- if (req.response_header.http_status.to_i != only_success_status_is_acceptable)
-
- error_message = "Server Error, status: #{req.response_header.http_status}, extended info: #{req.response}"
- puts(error_message)
- EM.stop unless is_reactor_running
+ if checkForBadJSON(req) == true
+ logAndRetryBadJSON(is_reactor_running, req, request)
else
-
- if %w(subscribe presence).include?(request.operation) && (cycle != false || request.first_request?)
- _request(request, is_reactor_running)
- else
- EM.stop unless is_reactor_running
- end
-
+ processGoodResponse(is_reactor_running, req, request)
end
}
rescue EventMachine::ConnectionError, RuntimeError => e # RuntimeError for catching "EventMachine not initialized"
error_message = "Network Error: #{e.message}"
@@ -321,8 +305,74 @@
return [0, error_message]
end
}
end
+ def checkForBadJSON(req)
+ jsonError = false
+ begin
+ JSON.parse(req.response)
+ rescue => e
+ jsonError = true
+ end
+ jsonError
+ end
+ def processGoodResponse(is_reactor_running, req, request)
-end
+ if (req.response_header.http_status.to_i != SUCCESS_RESPONSE)
+ logAndRetryBadResponseCode(is_reactor_running, req, request)
+ else
+
+ request.package_response!(req.response)
+ cycle = request.callback.call(request.response)
+
+ if %w(subscribe presence).include?(request.operation) && (cycle != false || request.first_request?)
+ _request(request, is_reactor_running)
+ else
+ EM.stop unless is_reactor_running
+ end
+ end
+ end
+
+ def logAndRetryGeneralError(is_reactor_running, req, request)
+ errMsg = "#{Time.now}: Network connectivity issue while attempting to reach #{request.url}"
+ logError(errMsg)
+ retryRequest(is_reactor_running, req, request, TIMEOUT_GENERAL_ERROR)
+ end
+
+ def logAndRetryBadJSON(is_reactor_running, req, request)
+ errMsg = "#{Time.now}: Retrying from bad JSON: #{req.response.to_s}"
+ logError(errMsg)
+ retryRequest(is_reactor_running, req, request, TIMEOUT_BAD_JSON_RESPONSE)
+ end
+
+ def logAndRetryBadResponseCode(is_reactor_running, req, request)
+ errMsg = "#{Time.now}: Retrying from bad server response code: (#{req.response_header.http_status.to_i}) #{req.response.to_s}"
+ logError(errMsg)
+ retryRequest(is_reactor_running, req, request, TIMEOUT_BAD_RESPONSE_CODE)
+ end
+
+ def logError(errMsg)
+ PUBNUB_LOGGER.debug(errMsg)
+ end
+
+ def retryRequest(is_reactor_running, req, request, delay)
+
+ if %w(subscribe presence).include?(request.operation)
+ EM::Timer.new(delay) do
+ _request(request, is_reactor_running)
+ end
+ else
+ error_msg = [0, "Request to #{request.url} failed."]
+
+ request.set_error(true)
+ request.callback.call(error_msg)
+
+ PUBNUB_LOGGER.debug(error_msg)
+
+ EM.stop unless is_reactor_running
+ end
+
+ end
+
+end
\ No newline at end of file