lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.1.1 vs lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.2.1
- old
+ new
@@ -1,27 +1,37 @@
module Fluent
class TwitterInput < Fluent::Input
TIMELINE_TYPE = %w(userstream sampling)
+ OUTPUT_FORMAT_TYPE = %w(nest flat simple)
Plugin.register_input('twitter', self)
config_param :consumer_key, :string
config_param :consumer_secret, :string
config_param :oauth_token, :string
config_param :oauth_token_secret, :string
config_param :tag, :string
config_param :timeline, :string
config_param :keyword, :string, :default => nil
- config_param :lang, :string, :default => ''
+ config_param :lang, :string, :default => nil
+ config_param :output_format, :string, :default => 'simple'
+ config_param :flatten_separator, :string, :default => '_'
def initialize
super
require 'tweetstream'
end
def configure(conf)
super
- raise Fluent::ConfigError, "timeline value undefined #{@timeline}" if !TIMELINE_TYPE.include?(@timeline)
+
+ if !TIMELINE_TYPE.include?(@timeline)
+ raise Fluent::ConfigError, "timeline value undefined #{@timeline}"
+ end
+ if !OUTPUT_FORMAT_TYPE.include?(@output_format)
+ raise Fluent::ConfigError, "output_format value undefined #{@output_format}"
+ end
+
TweetStream.configure do |config|
config.consumer_key = @consumer_key
config.consumer_secret = @consumer_secret
config.oauth_token = @oauth_token
config.oauth_token_secret = @oauth_token_secret
@@ -29,66 +39,81 @@
end
end
def start
@thread = Thread.new(&method(:run))
+ @any = Proc.new do |hash|
+ get_message(hash) if is_message?(hash)
+ end
end
def shutdown
Thread.kill(@thread)
end
def run
+ client = get_twitter_connection
if @timeline == 'sampling' && @keyword
- start_twitter_track
+ client.track(@keyword)
elsif @timeline == 'sampling' && @keyword.nil?
- start_twitter_sample
+ client.sample
elsif @timeline == 'userstream'
- start_twitter_userstream
+ client.userstream
+ #elsif @timeline == 'follow'
+ # client.follow(@follow_ids)
end
end
- def start_twitter_track
- $log.info "starting twitter keyword tracking. tag:#{@tag} lang:#{@lang} keyword:#{@keyword}"
+ def get_twitter_connection
+ notice = "twitter: starting Twitter Streaming API for #{@timeline}."
+ notice << " tag:#{@tag}"
+ notice << " lang:#{@lang}" unless @lang.nil?
+ notice << " keyword:#{@keyword}" unless @keyword.nil?
+ $log.info notice
client = TweetStream::Client.new
- client.track(@keyword) do |status|
- next unless status.text
- next unless @lang.include?(status.user.lang)
- get_message(status)
+ client.on_anything(&@any)
+ client.on_error do |message|
+ $log.info "twitter: unexpected error has occured. #{message}"
end
+ return client
end
- def start_twitter_sample
- $log.info "starting twitter sampled streaming. tag:#{@tag} lang:#{@lang}"
- client = TweetStream::Client.new
- client.sample do |status|
- next unless status.text
- next unless @lang.include?(status.user.lang)
- get_message(status)
- end
+ def is_message?(status)
+ return false if !status.include?(:text)
+ return false if !status.include?(:user)
+ return false if (!@lang.nil? && @lang != '') && !@lang.include?(status[:user][:lang])
+ return true
end
- def start_twitter_userstream
- $log.info "starting twitter userstream tracking. tag:#{@tag} lang:#{@lang}"
- client = TweetStream::Client.new
- client.userstream do |status|
- next unless status.text
- next unless @lang.include?(status.user.lang)
- get_message(status)
+ def get_message(status)
+ case @output_format
+ when 'nest'
+ record = status.inject({}){|f,(k,v)| f[k.to_s] = v; f}
+ when 'flat'
+ record = hash_flatten(status)
+ when 'simple'
+ record = Hash.new
+ record.store('message', status[:text])
+ record.store('geo', status[:geo])
+ record.store('place', status[:place])
+ record.store('created_at', status[:place])
+ record.store('user_name', status[:user][:name])
+ record.store('user_screen_name', status[:user][:screen_name])
+ record.store('user_profile_image_url', status[:user][:profile_image_url])
+ record.store('user_time_zone', status[:user][:time_zone])
+ record.store('user_lang', status[:user][:lang])
end
+ Engine.emit(@tag, Engine.now, record)
end
- def get_message(status)
- record = Hash.new
- record.store('message', status.text)
- record.store('geo', status.geo)
- record.store('place', status.place)
- record.store('created_at', status.created_at)
- record.store('user_name', status.user.name)
- record.store('user_screen_name', status.user.screen_name)
- record.store('user_profile_image_url', status.user.profile_image_url)
- record.store('user_time_zone', status.user.time_zone)
- record.store('user_lang', status.user.lang)
- Engine.emit(@tag, Engine.now, record)
+ def hash_flatten(record, prefix = nil)
+ record.inject({}) do |d, (k, v)|
+ k = prefix.to_s + k.to_s
+ if v.is_a?(Hash)
+ d.merge(hash_flatten(v, k + @flatten_separator))
+ else
+ d.merge(k => v)
+ end
+ end
end
end
end