lib/powertrack/streaming/stream.rb in powertrack-1.1.1 vs lib/powertrack/streaming/stream.rb in powertrack-1.2.0
- old
+ new
@@ -16,72 +16,92 @@
include PowerTrack::API
# Includes a logger, void by default
include VoidLogger::LoggerMixin
# The format of the URLs to connect to the various stream services
- FEATURE_URL_FORMAT = "https://%s:%s/accounts/%s/publishers/%s/%s/track/%s%s.json".freeze
+ FEATURE_URL_FORMAT = {
+ # [ hostname, account, source, mode, label, feature ]
+ v1: "https://%s.gnip.com/accounts/%s/publishers/%s/%s/track/%s%s.json".freeze,
+ # [ hostname, feature, account, source, label, sub-feature ]
+ v2: "https://gnip-%s.twitter.com/%s/powertrack/accounts/%s/publishers/%s/%s%s.json".freeze
+ }.freeze
# The default timeout on a connection to PowerTrack. Can be overriden per call.
DEFAULT_CONNECTION_TIMEOUT = 30
# The default timeout for inactivity on a connection to PowerTrack. Can be
# overriden per call.
DEFAULT_INACTIVITY_TIMEOUT = 50
# The default options for using the stream.
DEFAULT_STREAM_OPTIONS = {
+ # enable PowerTrack v2 API (using v1 by default)
+ v2: false,
+ # override the default connection timeout
connect_timeout: DEFAULT_CONNECTION_TIMEOUT,
+ # override the default inactivity timeout
inactivity_timeout: DEFAULT_INACTIVITY_TIMEOUT,
- # use a client id if you want to leverage the Backfill feature
+ # use a client id if you want to leverage the Backfill feature in v1
client_id: nil,
# enable the replay mode to get activities over the last 5 days
# see http://support.gnip.com/apis/replay/api_reference.html
replay: false
- }
+ }.freeze
DEFAULT_OK_RESPONSE_STATUS = 200
# The patterns used to identify the various types of message received from GNIP
# everything else is an activity
- HEARTBEAT_MESSAGE_PATTERN = /\A\s*\z/
- SYSTEM_MESSAGE_PATTERN = /\A\s*\{\s*"(info|warn|error)":/mi
+ HEARTBEAT_MESSAGE_PATTERN = /\A\s*\z/.freeze
+ SYSTEM_MESSAGE_PATTERN = /\A\s*\{\s*"(info|warn|error)":/mi.freeze
# The format used to send UTC timestamps in Replay mode
- REPLAY_TIMESTAMP_FORMAT = '%Y%m%d%H%M'
+ REPLAY_TIMESTAMP_FORMAT = '%Y%m%d%H%M'.freeze
attr_reader :username, :account_name, :data_source, :label
def initialize(username, password, account_name, data_source, label, options=nil)
@username = username
@password = password
@account_name = account_name
@data_source = data_source
@label = label
@options = DEFAULT_STREAM_OPTIONS.merge(options || {})
- @client_id = @options[:client_id]
@replay = !!@options[:replay]
+ @client_id = @options[:client_id]
@stream_mode = @replay ? 'replay' : 'streams'
+
+ # force v1 if Replay activated
+ @v2 = !@replay && !!@options[:v2]
end
# Adds many rules to your PowerTrack stream’s ruleset.
#
# <tt>POST /rules</tt>
#
# See http://support.gnip.com/apis/powertrack/api_reference.html#AddRules
def add_rules(*rules)
# flatten the rules in case it was provided as an array
- make_rules_request(:post, body: MultiJson.encode('rules' => rules.flatten), ok: 201)
+ make_rules_request(:post,
+ body: MultiJson.encode('rules' => rules.flatten),
+ ok: 201)
end
# Removes the specified rules from the stream.
#
# <tt>DELETE /rules</tt>
#
# See http://support.gnip.com/apis/powertrack/api_reference.html#DeleteRules
def delete_rules(*rules)
+ # v2 does not use DELETE anymore
+ delete_verb = @v2 ? :post : :delete
# flatten the rules in case it was provided as an array
- make_rules_request(:delete, body: MultiJson.encode('rules' => rules.flatten))
+ delete_options = { body: MultiJson.encode('rules' => rules.flatten) }
+ # v2 uses a query parameter
+ delete_options[:query] = { '_method' => 'delete' } if @v2
+
+ make_rules_request(delete_verb, delete_options)
end
DEFAULT_LIST_RULES_OPTIONS = {
compressed: true,
objectify: true
@@ -101,11 +121,13 @@
# return Rule objects when required and feasible/appropriate
if options[:objectify] &&
res.is_a?(Hash) &&
(rules = res['rules']).is_a?(Array) &&
rules.all? { |rule| rule.is_a?(Hash) && rule.key?('value') }
- rules.map { |rule| PowerTrack::Rule.new(rule['value'], rule['tag']) }
+ rules.map do |rule|
+ PowerTrack::Rule.new(rule['value'], tag: rule['tag'], id: rule['id'])
+ end
else
res
end
end
@@ -123,10 +145,12 @@
raw: false,
# the starting date from which the activities will be recovered (replay mode only)
from: nil,
# the ending date to which the activities will be recovered (replay mode only)
to: nil,
+ # specify a number of minutes to leverage the Backfill feature (v2 only)
+ backfill_minutes: nil,
# called for each message received, except heartbeats
on_message: nil,
# called for each activity received
on_activity: nil,
# called for each system message received
@@ -138,10 +162,12 @@
}.freeze
# Establishes a persistent connection to the PowerTrack data stream,
# through which the social data will be delivered.
#
+ # Manages reconnections when being disconnected.
+ #
# <tt>GET /track/:stream</tt>
#
# See http://support.gnip.com/apis/powertrack/api_reference.html#Stream
def track(options=nil)
options = DEFAULT_TRACK_OPTIONS.merge(options || {})
@@ -149,34 +175,35 @@
handle_api_response(*retrier.retry { track_once(options, retrier) })
end
private
- # Returns the fully-qualified domain name of a GNIP PowerTrack server
- # based on a hostname.
- def gnip_server_name(hostname)
- "%s.gnip.com" % [ hostname ]
- end
-
- # Returns the port used by GNIP PowerTrack servers.
- def gnip_server_port
- '443'
- end
-
# Returns the URL of the stream for a given feature.
- def feature_url(hostname, feature=nil)
- feature = feature ? "/#{feature}" : ''
- _url = FEATURE_URL_FORMAT %
- [ gnip_server_name(hostname),
- gnip_server_port,
- @account_name,
- @data_source,
- @stream_mode,
- @label,
- feature ]
+ def feature_url(hostname, feature=nil, sub_feature=nil)
+ _url = nil
+ if @v2
+ feature ||= hostname
+ sub_feature = sub_feature ? "/#{sub_feature}" : ''
+ _url = FEATURE_URL_FORMAT[:v2] %
+ [ hostname,
+ feature,
+ @account_name,
+ @data_source,
+ @label,
+ sub_feature ]
+ else
+ feature = feature ? "/#{feature}" : ''
+ _url = FEATURE_URL_FORMAT[:v1] %
+ [ hostname,
+ @account_name,
+ @data_source,
+ @stream_mode,
+ @label,
+ feature ]
- _url += "?client=#{@client_id}" if @client_id
+ _url += "?client=#{@client_id}" if @client_id
+ end
_url
end
# Returns the HTTP header that turns on GZip-based compression if required.
@@ -196,12 +223,12 @@
{ connect_timeout: @options[:connect_timeout],
inactivity_timeout: @options[:inactivity_timeout] }
end
# Opens a new connection to GNIP PowerTrack.
- def connect(hostname, feature=nil)
- url = feature_url(hostname, feature)
+ def connect(hostname, feature=nil, sub_feature=nil)
+ url = feature_url(hostname, feature, sub_feature)
logger.debug("Connecting to '#{url}' with headers #{connection_headers}...")
EventMachine::HttpRequest.new(url, connection_headers)
end
# Returns the HTTP headers common to each valid PowerTrack request.
@@ -262,12 +289,13 @@
end
DEFAULT_RULES_REQUEST_OPTIONS = {
ok: DEFAULT_OK_RESPONSE_STATUS,
headers: {},
+ query: {},
body: nil
- }
+ }.freeze
# Makes a rules-related request with a specific HTTP verb and a few options.
# Returns the response if successful or an exception if the request failed.
def make_rules_request(verb, options=nil)
options = DEFAULT_RULES_REQUEST_OPTIONS.merge(options || {})
@@ -277,10 +305,11 @@
EM.run do
con = connect('api', 'rules')
http = con.setup_request(verb,
head: rules_req_headers.merge(options[:headers]),
+ query: options[:query],
body: options[:body])
http.errback do
resp_error = http.error
EM.stop
@@ -313,14 +342,14 @@
def track_req_headers(compressed)
common_req_headers.merge('connection' => 'keep-alive')
.merge(gzip_compressed_header(compressed))
end
- # Connects to the /track endpoint and manages reconnections when being
- # disconnected.
+ # Connects to the /track endpoint.
def track_once(options, retrier)
logger.info "Starting tracker for retry ##{retrier.retries}..."
+ backfill_minutes = options[:backfill_minutes]
stop_timeout = options[:stop_timeout]
on_heartbeat = options[:on_heartbeat]
on_message = options[:on_message]
on_activity = options[:on_activity]
on_system = options[:on_system]
@@ -334,25 +363,32 @@
resp_body = nil
EM.run do
logger.info "Starting the reactor..."
con = connect('stream')
- get_opts = { head: track_req_headers(options[:compressed]) }
+ get_opts = {
+ head: track_req_headers(options[:compressed]),
+ query: {}
+ }
# add a timeframe in replay mode
if @replay
now = Time.now
# start 1 hour ago by default
from = options[:from] || (now - 60*60)
# stop 30 minutes ago by default
to = options[:to] || (now - 30*60)
- get_opts[:query] = {
+ get_opts[:query].merge!({
'fromDate' => from.utc.strftime(REPLAY_TIMESTAMP_FORMAT),
'toDate' => to.utc.strftime(REPLAY_TIMESTAMP_FORMAT)
- }
+ })
logger.info "Replay mode enabled from '#{from}' to '#{to}'"
+ end
+
+ if @v2 && backfill_minutes
+ get_opts[:query]['backfillMinutes'] = backfill_minutes
end
http = con.get(get_opts)
# polls to see if the connection should be closed