lib/powertrack/streaming/stream.rb in powertrack-1.0.3 vs lib/powertrack/streaming/stream.rb in powertrack-1.1.0
- old
+ new
@@ -16,11 +16,11 @@
include PowerTrack::API
# Includes a logger, void by default
include VoidLogger::LoggerMixin
# The format of the URLs to connect to the various stream services
- FEATURE_URL_FORMAT = "https://%s:%s/accounts/%s/publishers/%s/streams/track/%s%s.json".freeze
+ FEATURE_URL_FORMAT = "https://%s:%s/accounts/%s/publishers/%s/%s/track/%s%s.json".freeze
# The default timeout on a connection to PowerTrack. Can be overriden per call.
DEFAULT_CONNECTION_TIMEOUT = 30
# The default timeout for inactivity on a connection to PowerTrack. Can be
@@ -30,30 +30,38 @@
# The default options for using the stream.
DEFAULT_STREAM_OPTIONS = {
connect_timeout: DEFAULT_CONNECTION_TIMEOUT,
inactivity_timeout: DEFAULT_INACTIVITY_TIMEOUT,
# use a client id if you want to leverage the Backfill feature
- client_id: nil
+ client_id: nil,
+ # enable the replay mode to get activities over the last 5 days
+ # see http://support.gnip.com/apis/replay/api_reference.html
+ replay: false
}
DEFAULT_OK_RESPONSE_STATUS = 200
- # the patterns used to identify the various types of message received from GNIP
+ # The patterns used to identify the various types of message received from GNIP
# everything else is an activity
HEARTBEAT_MESSAGE_PATTERN = /\A\s*\z/
SYSTEM_MESSAGE_PATTERN = /\A\s*\{\s*"(info|warn|error)":/mi
+ # The format used to send UTC timestamps in Replay mode
+ REPLAY_TIMESTAMP_FORMAT = '%Y%m%d%H%M'
+
attr_reader :username, :account_name, :data_source, :label
def initialize(username, password, account_name, data_source, label, options=nil)
@username = username
@password = password
@account_name = account_name
@data_source = data_source
@label = label
@options = DEFAULT_STREAM_OPTIONS.merge(options || {})
@client_id = @options[:client_id]
+ @replay = !!@options[:replay]
+ @stream_mode = @replay ? 'replay' : 'streams'
end
# Adds many rules to your PowerTrack stream’s ruleset.
#
# <tt>POST /rules</tt>
@@ -103,18 +111,22 @@
DEFAULT_TRACK_OPTIONS = {
# receive GZip-compressed payloads ?
compressed: true,
# max number of retries after a disconnection
- max_retries: 3,
+ max_retries: 2,
# advanced options to configure exponential backoff used for retries
backoff: nil,
# max number of seconds to wait for last message handlers to complete
stop_timeout: 10,
# pass message in raw form (JSON formatted string) instead of JSON-decoded
# Ruby objects to message handlers
raw: false,
+ # the starting date from which the activities will be recovered (replay mode only)
+ from: nil,
+ # the ending date to which the activities will be recovered (replay mode only)
+ to: nil,
# called for each message received, except heartbeats
on_message: nil,
# called for each activity received
on_activity: nil,
# called for each system message received
@@ -156,10 +168,11 @@
_url = FEATURE_URL_FORMAT %
[ gnip_server_name(hostname),
gnip_server_port,
@account_name,
@data_source,
+ @stream_mode,
@label,
feature ]
_url += "?client=#{@client_id}" if @client_id
@@ -185,10 +198,11 @@
end
# Opens a new connection to GNIP PowerTrack.
def connect(hostname, feature=nil)
url = feature_url(hostname, feature)
+ logger.debug("Connecting to '#{url}' with headers #{connection_headers}...")
EventMachine::HttpRequest.new(url, connection_headers)
end
# Returns the HTTP headers common to each valid PowerTrack request.
# Each call returns a new hash which can be safely modified by the caller.
@@ -281,18 +295,18 @@
end
handle_api_response(resp_status, resp_error, resp_body, options[:ok])
end
- # Returns the type of message received on the stream, nil when the type
- # cannot be identified.
+ # Returns the type of message received on the stream, together with a
+ # level indicator in case of a system message, nil otherwise.
def message_type(message)
case message
- when HEARTBEAT_MESSAGE_PATTERN then :heartbeat
- when SYSTEM_MESSAGE_PATTERN then :system
+ when HEARTBEAT_MESSAGE_PATTERN then [ :heartbeat, nil ]
+ when SYSTEM_MESSAGE_PATTERN then [ :system, $1.downcase.to_sym ]
else
- :activity
+ [ :activity, nil ]
end
end
# Returns the HTTP headers for each valid /track request.
# Each call returns a new hash which can be safely modified by the caller.
@@ -320,12 +334,30 @@
resp_body = nil
EM.run do
logger.info "Starting the reactor..."
con = connect('stream')
- http = con.get(head: track_req_headers(options[:compressed]))
+ get_opts = { head: track_req_headers(options[:compressed]) }
+ # add a timeframe in replay mode
+ if @replay
+ now = Time.now
+ # start 1 hour ago by default
+ from = options[:from] || (now - 60*60)
+ # stop 30 minutes ago by default
+ to = options[:to] || (now - 30*60)
+
+ get_opts[:query] = {
+ 'fromDate' => from.utc.strftime(REPLAY_TIMESTAMP_FORMAT),
+ 'toDate' => to.utc.strftime(REPLAY_TIMESTAMP_FORMAT)
+ }
+
+ logger.info "Replay mode enabled from '#{from}' to '#{to}'"
+ end
+
+ http = con.get(get_opts)
+
# polls to see if the connection should be closed
close_watcher = EM.add_periodic_timer(1) do
# exit if required
if close_now.call
logger.info "Time to close the tracker"
@@ -350,23 +382,25 @@
if disconnected
logger.warn "Message received while already disconnected"
next
end
- # reset retries when some (valid) data are received
- if retrier.retrying?
- logger.info "Resetting retries..."
- retrier.reset!
- end
-
# process the chunk
buffer.process(chunk) do |raw|
logger.debug "New message received"
+
+ # get the message type and its (optional) level
+ m_type, m_level = message_type(raw)
+
+ # reset retries when some (valid) data are received
+ if retrier.retrying? && m_level != :error
+ logger.info "Resetting retries..."
+ retrier.reset!
+ end
+
EM.defer do
# select the right message handler(s) according to the message type
- m_type = message_type(raw)
-
if m_type == :heartbeat
on_heartbeat.call if on_heartbeat
else
# JSON decoding if required
message = options[:raw] ? raw : MultiJson.decode(raw)
@@ -399,9 +433,10 @@
close_watcher.cancel
resp_status = http_client.response_header.status || DEFAULT_OK_RESPONSE_STATUS
resp_error = http_client.error
resp_body = http_client.response
+
wait_til_defers_finish_and_stop(stop_timeout)
end
end
http.callback(&reconnect_cb)