lib/powertrack/streaming/stream.rb in powertrack-1.1.1 vs lib/powertrack/streaming/stream.rb in powertrack-1.2.0

- old
+ new

@@ -16,72 +16,92 @@ include PowerTrack::API # Includes a logger, void by default include VoidLogger::LoggerMixin # The format of the URLs to connect to the various stream services - FEATURE_URL_FORMAT = "https://%s:%s/accounts/%s/publishers/%s/%s/track/%s%s.json".freeze + FEATURE_URL_FORMAT = { + # [ hostname, account, source, mode, label, feature ] + v1: "https://%s.gnip.com/accounts/%s/publishers/%s/%s/track/%s%s.json".freeze, + # [ hostname, feature, account, source, label, sub-feature ] + v2: "https://gnip-%s.twitter.com/%s/powertrack/accounts/%s/publishers/%s/%s%s.json".freeze + }.freeze # The default timeout on a connection to PowerTrack. Can be overriden per call. DEFAULT_CONNECTION_TIMEOUT = 30 # The default timeout for inactivity on a connection to PowerTrack. Can be # overriden per call. DEFAULT_INACTIVITY_TIMEOUT = 50 # The default options for using the stream. DEFAULT_STREAM_OPTIONS = { + # enable PowerTrack v2 API (using v1 by default) + v2: false, + # override the default connection timeout connect_timeout: DEFAULT_CONNECTION_TIMEOUT, + # override the default inactivity timeout inactivity_timeout: DEFAULT_INACTIVITY_TIMEOUT, - # use a client id if you want to leverage the Backfill feature + # use a client id if you want to leverage the Backfill feature in v1 client_id: nil, # enable the replay mode to get activities over the last 5 days # see http://support.gnip.com/apis/replay/api_reference.html replay: false - } + }.freeze DEFAULT_OK_RESPONSE_STATUS = 200 # The patterns used to identify the various types of message received from GNIP # everything else is an activity - HEARTBEAT_MESSAGE_PATTERN = /\A\s*\z/ - SYSTEM_MESSAGE_PATTERN = /\A\s*\{\s*"(info|warn|error)":/mi + HEARTBEAT_MESSAGE_PATTERN = /\A\s*\z/.freeze + SYSTEM_MESSAGE_PATTERN = /\A\s*\{\s*"(info|warn|error)":/mi.freeze # The format used to send UTC timestamps in Replay mode - REPLAY_TIMESTAMP_FORMAT = '%Y%m%d%H%M' + REPLAY_TIMESTAMP_FORMAT = '%Y%m%d%H%M'.freeze attr_reader :username, :account_name, :data_source, :label def initialize(username, password, account_name, data_source, label, options=nil) @username = username @password = password @account_name = account_name @data_source = data_source @label = label @options = DEFAULT_STREAM_OPTIONS.merge(options || {}) - @client_id = @options[:client_id] @replay = !!@options[:replay] + @client_id = @options[:client_id] @stream_mode = @replay ? 'replay' : 'streams' + + # force v1 if Replay activated + @v2 = !@replay && !!@options[:v2] end # Adds many rules to your PowerTrack stream’s ruleset. # # <tt>POST /rules</tt> # # See http://support.gnip.com/apis/powertrack/api_reference.html#AddRules def add_rules(*rules) # flatten the rules in case it was provided as an array - make_rules_request(:post, body: MultiJson.encode('rules' => rules.flatten), ok: 201) + make_rules_request(:post, + body: MultiJson.encode('rules' => rules.flatten), + ok: 201) end # Removes the specified rules from the stream. # # <tt>DELETE /rules</tt> # # See http://support.gnip.com/apis/powertrack/api_reference.html#DeleteRules def delete_rules(*rules) + # v2 does not use DELETE anymore + delete_verb = @v2 ? :post : :delete # flatten the rules in case it was provided as an array - make_rules_request(:delete, body: MultiJson.encode('rules' => rules.flatten)) + delete_options = { body: MultiJson.encode('rules' => rules.flatten) } + # v2 uses a query parameter + delete_options[:query] = { '_method' => 'delete' } if @v2 + + make_rules_request(delete_verb, delete_options) end DEFAULT_LIST_RULES_OPTIONS = { compressed: true, objectify: true @@ -101,11 +121,13 @@ # return Rule objects when required and feasible/appropriate if options[:objectify] && res.is_a?(Hash) && (rules = res['rules']).is_a?(Array) && rules.all? { |rule| rule.is_a?(Hash) && rule.key?('value') } - rules.map { |rule| PowerTrack::Rule.new(rule['value'], rule['tag']) } + rules.map do |rule| + PowerTrack::Rule.new(rule['value'], tag: rule['tag'], id: rule['id']) + end else res end end @@ -123,10 +145,12 @@ raw: false, # the starting date from which the activities will be recovered (replay mode only) from: nil, # the ending date to which the activities will be recovered (replay mode only) to: nil, + # specify a number of minutes to leverage the Backfill feature (v2 only) + backfill_minutes: nil, # called for each message received, except heartbeats on_message: nil, # called for each activity received on_activity: nil, # called for each system message received @@ -138,10 +162,12 @@ }.freeze # Establishes a persistent connection to the PowerTrack data stream, # through which the social data will be delivered. # + # Manages reconnections when being disconnected. + # # <tt>GET /track/:stream</tt> # # See http://support.gnip.com/apis/powertrack/api_reference.html#Stream def track(options=nil) options = DEFAULT_TRACK_OPTIONS.merge(options || {}) @@ -149,34 +175,35 @@ handle_api_response(*retrier.retry { track_once(options, retrier) }) end private - # Returns the fully-qualified domain name of a GNIP PowerTrack server - # based on a hostname. - def gnip_server_name(hostname) - "%s.gnip.com" % [ hostname ] - end - - # Returns the port used by GNIP PowerTrack servers. - def gnip_server_port - '443' - end - # Returns the URL of the stream for a given feature. - def feature_url(hostname, feature=nil) - feature = feature ? "/#{feature}" : '' - _url = FEATURE_URL_FORMAT % - [ gnip_server_name(hostname), - gnip_server_port, - @account_name, - @data_source, - @stream_mode, - @label, - feature ] + def feature_url(hostname, feature=nil, sub_feature=nil) + _url = nil + if @v2 + feature ||= hostname + sub_feature = sub_feature ? "/#{sub_feature}" : '' + _url = FEATURE_URL_FORMAT[:v2] % + [ hostname, + feature, + @account_name, + @data_source, + @label, + sub_feature ] + else + feature = feature ? "/#{feature}" : '' + _url = FEATURE_URL_FORMAT[:v1] % + [ hostname, + @account_name, + @data_source, + @stream_mode, + @label, + feature ] - _url += "?client=#{@client_id}" if @client_id + _url += "?client=#{@client_id}" if @client_id + end _url end # Returns the HTTP header that turns on GZip-based compression if required. @@ -196,12 +223,12 @@ { connect_timeout: @options[:connect_timeout], inactivity_timeout: @options[:inactivity_timeout] } end # Opens a new connection to GNIP PowerTrack. - def connect(hostname, feature=nil) - url = feature_url(hostname, feature) + def connect(hostname, feature=nil, sub_feature=nil) + url = feature_url(hostname, feature, sub_feature) logger.debug("Connecting to '#{url}' with headers #{connection_headers}...") EventMachine::HttpRequest.new(url, connection_headers) end # Returns the HTTP headers common to each valid PowerTrack request. @@ -262,12 +289,13 @@ end DEFAULT_RULES_REQUEST_OPTIONS = { ok: DEFAULT_OK_RESPONSE_STATUS, headers: {}, + query: {}, body: nil - } + }.freeze # Makes a rules-related request with a specific HTTP verb and a few options. # Returns the response if successful or an exception if the request failed. def make_rules_request(verb, options=nil) options = DEFAULT_RULES_REQUEST_OPTIONS.merge(options || {}) @@ -277,10 +305,11 @@ EM.run do con = connect('api', 'rules') http = con.setup_request(verb, head: rules_req_headers.merge(options[:headers]), + query: options[:query], body: options[:body]) http.errback do resp_error = http.error EM.stop @@ -313,14 +342,14 @@ def track_req_headers(compressed) common_req_headers.merge('connection' => 'keep-alive') .merge(gzip_compressed_header(compressed)) end - # Connects to the /track endpoint and manages reconnections when being - # disconnected. + # Connects to the /track endpoint. def track_once(options, retrier) logger.info "Starting tracker for retry ##{retrier.retries}..." + backfill_minutes = options[:backfill_minutes] stop_timeout = options[:stop_timeout] on_heartbeat = options[:on_heartbeat] on_message = options[:on_message] on_activity = options[:on_activity] on_system = options[:on_system] @@ -334,25 +363,32 @@ resp_body = nil EM.run do logger.info "Starting the reactor..." con = connect('stream') - get_opts = { head: track_req_headers(options[:compressed]) } + get_opts = { + head: track_req_headers(options[:compressed]), + query: {} + } # add a timeframe in replay mode if @replay now = Time.now # start 1 hour ago by default from = options[:from] || (now - 60*60) # stop 30 minutes ago by default to = options[:to] || (now - 30*60) - get_opts[:query] = { + get_opts[:query].merge!({ 'fromDate' => from.utc.strftime(REPLAY_TIMESTAMP_FORMAT), 'toDate' => to.utc.strftime(REPLAY_TIMESTAMP_FORMAT) - } + }) logger.info "Replay mode enabled from '#{from}' to '#{to}'" + end + + if @v2 && backfill_minutes + get_opts[:query]['backfillMinutes'] = backfill_minutes end http = con.get(get_opts) # polls to see if the connection should be closed