lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.3.2 vs lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.4.0

- old
+ new

@@ -1,18 +1,19 @@ module Fluent class TwitterInput < Fluent::Input - TIMELINE_TYPE = %w(userstream sampling) + TIMELINE_TYPE = %w(userstream sampling tracking) OUTPUT_FORMAT_TYPE = %w(nest flat simple) Plugin.register_input('twitter', self) config_param :consumer_key, :string config_param :consumer_secret, :string config_param :oauth_token, :string config_param :oauth_token_secret, :string config_param :tag, :string config_param :timeline, :string config_param :keyword, :string, :default => nil + config_param :follow_ids, :string, :default => nil config_param :lang, :string, :default => nil config_param :output_format, :string, :default => 'simple' config_param :flatten_separator, :string, :default => '_' def initialize @@ -53,26 +54,27 @@ Thread.kill(@thread) end def run client = get_twitter_connection - if @timeline == 'sampling' && @keyword + if ['sampling', 'tracking'].include?(@timeline) && @keyword client.track(@keyword) - elsif @timeline == 'sampling' && @keyword.nil? + elsif @timeline == 'tracking' && @follow_ids + client.follow(@follow_ids) + elsif @timeline == 'sampling' && @keyword.nil? && @follow_ids.nil? client.sample elsif @timeline == 'userstream' client.userstream - #elsif @timeline == 'follow' - # client.follow(@follow_ids) end end def get_twitter_connection notice = "twitter: starting Twitter Streaming API for #{@timeline}." notice << " tag:#{@tag}" notice << " lang:#{@lang}" unless @lang.nil? notice << " keyword:#{@keyword}" unless @keyword.nil? + notice << " follow:#{@follow_ids}" unless @follow_ids.nil? && !@keyword.nil? $log.info notice client = TweetStream::Client.new client.on_anything(&@any) client.on_error do |message| $log.info "twitter: unexpected error has occured. #{message}" @@ -98,11 +100,11 @@ record = hash_key_to_s(status) when 'flat' record = hash_flatten(status) when 'simple' record = Hash.new - record.store('message', status[:text]) + record.store('message', status[:text]).scrub('') record.store('geo', status[:geo]) record.store('place', status[:place]) record.store('created_at', status[:created_at]) record.store('user_name', status[:user][:name]) record.store('user_screen_name', status[:user][:screen_name]) @@ -114,12 +116,14 @@ end def hash_flatten(record, prefix = nil) record.inject({}) do |d, (k, v)| k = prefix.to_s + k.to_s - if v.is_a?(Hash) + if v.instance_of?(Hash) d.merge(hash_flatten(v, k + @flatten_separator)) + elsif v.instance_of?(String) + d.merge(k => v.scrub("")) else d.merge(k => v) end end end @@ -129,10 +133,12 @@ hash.each do |k, v| if v.instance_of?(Hash) then newhash[k.to_s] = hash_key_to_s(v) elsif v.instance_of?(Array) then newhash[k.to_s] = array_key_to_s(v) + elsif v.instance_of?(String) + newhash[k.to_s] = v.scrub('') else newhash[k.to_s] = v end end newhash @@ -142,9 +148,11 @@ array.map do |v| if v.instance_of?(Hash) then hash_key_to_s(v) elsif v.instance_of?(Array) then array_key_to_s(v) + elsif v.instance_of?(String) then + v.scrub('') else v end end end