lib/powertrack/streaming/stream.rb in powertrack-1.0.3 vs lib/powertrack/streaming/stream.rb in powertrack-1.1.0

- old
+ new

@@ -16,11 +16,11 @@ include PowerTrack::API # Includes a logger, void by default include VoidLogger::LoggerMixin # The format of the URLs to connect to the various stream services - FEATURE_URL_FORMAT = "https://%s:%s/accounts/%s/publishers/%s/streams/track/%s%s.json".freeze + FEATURE_URL_FORMAT = "https://%s:%s/accounts/%s/publishers/%s/%s/track/%s%s.json".freeze # The default timeout on a connection to PowerTrack. Can be overriden per call. DEFAULT_CONNECTION_TIMEOUT = 30 # The default timeout for inactivity on a connection to PowerTrack. Can be @@ -30,30 +30,38 @@ # The default options for using the stream. DEFAULT_STREAM_OPTIONS = { connect_timeout: DEFAULT_CONNECTION_TIMEOUT, inactivity_timeout: DEFAULT_INACTIVITY_TIMEOUT, # use a client id if you want to leverage the Backfill feature - client_id: nil + client_id: nil, + # enable the replay mode to get activities over the last 5 days + # see http://support.gnip.com/apis/replay/api_reference.html + replay: false } DEFAULT_OK_RESPONSE_STATUS = 200 - # the patterns used to identify the various types of message received from GNIP + # The patterns used to identify the various types of message received from GNIP # everything else is an activity HEARTBEAT_MESSAGE_PATTERN = /\A\s*\z/ SYSTEM_MESSAGE_PATTERN = /\A\s*\{\s*"(info|warn|error)":/mi + # The format used to send UTC timestamps in Replay mode + REPLAY_TIMESTAMP_FORMAT = '%Y%m%d%H%M' + attr_reader :username, :account_name, :data_source, :label def initialize(username, password, account_name, data_source, label, options=nil) @username = username @password = password @account_name = account_name @data_source = data_source @label = label @options = DEFAULT_STREAM_OPTIONS.merge(options || {}) @client_id = @options[:client_id] + @replay = !!@options[:replay] + @stream_mode = @replay ? 'replay' : 'streams' end # Adds many rules to your PowerTrack stream’s ruleset. # # <tt>POST /rules</tt> @@ -103,18 +111,22 @@ DEFAULT_TRACK_OPTIONS = { # receive GZip-compressed payloads ? compressed: true, # max number of retries after a disconnection - max_retries: 3, + max_retries: 2, # advanced options to configure exponential backoff used for retries backoff: nil, # max number of seconds to wait for last message handlers to complete stop_timeout: 10, # pass message in raw form (JSON formatted string) instead of JSON-decoded # Ruby objects to message handlers raw: false, + # the starting date from which the activities will be recovered (replay mode only) + from: nil, + # the ending date to which the activities will be recovered (replay mode only) + to: nil, # called for each message received, except heartbeats on_message: nil, # called for each activity received on_activity: nil, # called for each system message received @@ -156,10 +168,11 @@ _url = FEATURE_URL_FORMAT % [ gnip_server_name(hostname), gnip_server_port, @account_name, @data_source, + @stream_mode, @label, feature ] _url += "?client=#{@client_id}" if @client_id @@ -185,10 +198,11 @@ end # Opens a new connection to GNIP PowerTrack. def connect(hostname, feature=nil) url = feature_url(hostname, feature) + logger.debug("Connecting to '#{url}' with headers #{connection_headers}...") EventMachine::HttpRequest.new(url, connection_headers) end # Returns the HTTP headers common to each valid PowerTrack request. # Each call returns a new hash which can be safely modified by the caller. @@ -281,18 +295,18 @@ end handle_api_response(resp_status, resp_error, resp_body, options[:ok]) end - # Returns the type of message received on the stream, nil when the type - # cannot be identified. + # Returns the type of message received on the stream, together with a + # level indicator in case of a system message, nil otherwise. def message_type(message) case message - when HEARTBEAT_MESSAGE_PATTERN then :heartbeat - when SYSTEM_MESSAGE_PATTERN then :system + when HEARTBEAT_MESSAGE_PATTERN then [ :heartbeat, nil ] + when SYSTEM_MESSAGE_PATTERN then [ :system, $1.downcase.to_sym ] else - :activity + [ :activity, nil ] end end # Returns the HTTP headers for each valid /track request. # Each call returns a new hash which can be safely modified by the caller. @@ -320,12 +334,30 @@ resp_body = nil EM.run do logger.info "Starting the reactor..." con = connect('stream') - http = con.get(head: track_req_headers(options[:compressed])) + get_opts = { head: track_req_headers(options[:compressed]) } + # add a timeframe in replay mode + if @replay + now = Time.now + # start 1 hour ago by default + from = options[:from] || (now - 60*60) + # stop 30 minutes ago by default + to = options[:to] || (now - 30*60) + + get_opts[:query] = { + 'fromDate' => from.utc.strftime(REPLAY_TIMESTAMP_FORMAT), + 'toDate' => to.utc.strftime(REPLAY_TIMESTAMP_FORMAT) + } + + logger.info "Replay mode enabled from '#{from}' to '#{to}'" + end + + http = con.get(get_opts) + # polls to see if the connection should be closed close_watcher = EM.add_periodic_timer(1) do # exit if required if close_now.call logger.info "Time to close the tracker" @@ -350,23 +382,25 @@ if disconnected logger.warn "Message received while already disconnected" next end - # reset retries when some (valid) data are received - if retrier.retrying? - logger.info "Resetting retries..." - retrier.reset! - end - # process the chunk buffer.process(chunk) do |raw| logger.debug "New message received" + + # get the message type and its (optional) level + m_type, m_level = message_type(raw) + + # reset retries when some (valid) data are received + if retrier.retrying? && m_level != :error + logger.info "Resetting retries..." + retrier.reset! + end + EM.defer do # select the right message handler(s) according to the message type - m_type = message_type(raw) - if m_type == :heartbeat on_heartbeat.call if on_heartbeat else # JSON decoding if required message = options[:raw] ? raw : MultiJson.decode(raw) @@ -399,9 +433,10 @@ close_watcher.cancel resp_status = http_client.response_header.status || DEFAULT_OK_RESPONSE_STATUS resp_error = http_client.error resp_body = http_client.response + wait_til_defers_finish_and_stop(stop_timeout) end end http.callback(&reconnect_cb)