lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.3.2 vs lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.4.0
- old
+ new
@@ -1,18 +1,19 @@
module Fluent
class TwitterInput < Fluent::Input
- TIMELINE_TYPE = %w(userstream sampling)
+ TIMELINE_TYPE = %w(userstream sampling tracking)
OUTPUT_FORMAT_TYPE = %w(nest flat simple)
Plugin.register_input('twitter', self)
config_param :consumer_key, :string
config_param :consumer_secret, :string
config_param :oauth_token, :string
config_param :oauth_token_secret, :string
config_param :tag, :string
config_param :timeline, :string
config_param :keyword, :string, :default => nil
+ config_param :follow_ids, :string, :default => nil
config_param :lang, :string, :default => nil
config_param :output_format, :string, :default => 'simple'
config_param :flatten_separator, :string, :default => '_'
def initialize
@@ -53,26 +54,27 @@
Thread.kill(@thread)
end
def run
client = get_twitter_connection
- if @timeline == 'sampling' && @keyword
+ if ['sampling', 'tracking'].include?(@timeline) && @keyword
client.track(@keyword)
- elsif @timeline == 'sampling' && @keyword.nil?
+ elsif @timeline == 'tracking' && @follow_ids
+ client.follow(@follow_ids)
+ elsif @timeline == 'sampling' && @keyword.nil? && @follow_ids.nil?
client.sample
elsif @timeline == 'userstream'
client.userstream
- #elsif @timeline == 'follow'
- # client.follow(@follow_ids)
end
end
def get_twitter_connection
notice = "twitter: starting Twitter Streaming API for #{@timeline}."
notice << " tag:#{@tag}"
notice << " lang:#{@lang}" unless @lang.nil?
notice << " keyword:#{@keyword}" unless @keyword.nil?
+ notice << " follow:#{@follow_ids}" unless @follow_ids.nil? && !@keyword.nil?
$log.info notice
client = TweetStream::Client.new
client.on_anything(&@any)
client.on_error do |message|
$log.info "twitter: unexpected error has occured. #{message}"
@@ -98,11 +100,11 @@
record = hash_key_to_s(status)
when 'flat'
record = hash_flatten(status)
when 'simple'
record = Hash.new
- record.store('message', status[:text])
+ record.store('message', status[:text]).scrub('')
record.store('geo', status[:geo])
record.store('place', status[:place])
record.store('created_at', status[:created_at])
record.store('user_name', status[:user][:name])
record.store('user_screen_name', status[:user][:screen_name])
@@ -114,12 +116,14 @@
end
def hash_flatten(record, prefix = nil)
record.inject({}) do |d, (k, v)|
k = prefix.to_s + k.to_s
- if v.is_a?(Hash)
+ if v.instance_of?(Hash)
d.merge(hash_flatten(v, k + @flatten_separator))
+ elsif v.instance_of?(String)
+ d.merge(k => v.scrub(""))
else
d.merge(k => v)
end
end
end
@@ -129,10 +133,12 @@
hash.each do |k, v|
if v.instance_of?(Hash) then
newhash[k.to_s] = hash_key_to_s(v)
elsif v.instance_of?(Array) then
newhash[k.to_s] = array_key_to_s(v)
+ elsif v.instance_of?(String)
+ newhash[k.to_s] = v.scrub('')
else
newhash[k.to_s] = v
end
end
newhash
@@ -142,9 +148,11 @@
array.map do |v|
if v.instance_of?(Hash) then
hash_key_to_s(v)
elsif v.instance_of?(Array) then
array_key_to_s(v)
+ elsif v.instance_of?(String) then
+ v.scrub('')
else
v
end
end
end