lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.1.1 vs lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.2.1

- old
+ new

@@ -1,27 +1,37 @@ module Fluent class TwitterInput < Fluent::Input TIMELINE_TYPE = %w(userstream sampling) + OUTPUT_FORMAT_TYPE = %w(nest flat simple) Plugin.register_input('twitter', self) config_param :consumer_key, :string config_param :consumer_secret, :string config_param :oauth_token, :string config_param :oauth_token_secret, :string config_param :tag, :string config_param :timeline, :string config_param :keyword, :string, :default => nil - config_param :lang, :string, :default => '' + config_param :lang, :string, :default => nil + config_param :output_format, :string, :default => 'simple' + config_param :flatten_separator, :string, :default => '_' def initialize super require 'tweetstream' end def configure(conf) super - raise Fluent::ConfigError, "timeline value undefined #{@timeline}" if !TIMELINE_TYPE.include?(@timeline) + + if !TIMELINE_TYPE.include?(@timeline) + raise Fluent::ConfigError, "timeline value undefined #{@timeline}" + end + if !OUTPUT_FORMAT_TYPE.include?(@output_format) + raise Fluent::ConfigError, "output_format value undefined #{@output_format}" + end + TweetStream.configure do |config| config.consumer_key = @consumer_key config.consumer_secret = @consumer_secret config.oauth_token = @oauth_token config.oauth_token_secret = @oauth_token_secret @@ -29,66 +39,81 @@ end end def start @thread = Thread.new(&method(:run)) + @any = Proc.new do |hash| + get_message(hash) if is_message?(hash) + end end def shutdown Thread.kill(@thread) end def run + client = get_twitter_connection if @timeline == 'sampling' && @keyword - start_twitter_track + client.track(@keyword) elsif @timeline == 'sampling' && @keyword.nil? - start_twitter_sample + client.sample elsif @timeline == 'userstream' - start_twitter_userstream + client.userstream + #elsif @timeline == 'follow' + # client.follow(@follow_ids) end end - def start_twitter_track - $log.info "starting twitter keyword tracking. tag:#{@tag} lang:#{@lang} keyword:#{@keyword}" + def get_twitter_connection + notice = "twitter: starting Twitter Streaming API for #{@timeline}." + notice << " tag:#{@tag}" + notice << " lang:#{@lang}" unless @lang.nil? + notice << " keyword:#{@keyword}" unless @keyword.nil? + $log.info notice client = TweetStream::Client.new - client.track(@keyword) do |status| - next unless status.text - next unless @lang.include?(status.user.lang) - get_message(status) + client.on_anything(&@any) + client.on_error do |message| + $log.info "twitter: unexpected error has occured. #{message}" end + return client end - def start_twitter_sample - $log.info "starting twitter sampled streaming. tag:#{@tag} lang:#{@lang}" - client = TweetStream::Client.new - client.sample do |status| - next unless status.text - next unless @lang.include?(status.user.lang) - get_message(status) - end + def is_message?(status) + return false if !status.include?(:text) + return false if !status.include?(:user) + return false if (!@lang.nil? && @lang != '') && !@lang.include?(status[:user][:lang]) + return true end - def start_twitter_userstream - $log.info "starting twitter userstream tracking. tag:#{@tag} lang:#{@lang}" - client = TweetStream::Client.new - client.userstream do |status| - next unless status.text - next unless @lang.include?(status.user.lang) - get_message(status) + def get_message(status) + case @output_format + when 'nest' + record = status.inject({}){|f,(k,v)| f[k.to_s] = v; f} + when 'flat' + record = hash_flatten(status) + when 'simple' + record = Hash.new + record.store('message', status[:text]) + record.store('geo', status[:geo]) + record.store('place', status[:place]) + record.store('created_at', status[:place]) + record.store('user_name', status[:user][:name]) + record.store('user_screen_name', status[:user][:screen_name]) + record.store('user_profile_image_url', status[:user][:profile_image_url]) + record.store('user_time_zone', status[:user][:time_zone]) + record.store('user_lang', status[:user][:lang]) end + Engine.emit(@tag, Engine.now, record) end - def get_message(status) - record = Hash.new - record.store('message', status.text) - record.store('geo', status.geo) - record.store('place', status.place) - record.store('created_at', status.created_at) - record.store('user_name', status.user.name) - record.store('user_screen_name', status.user.screen_name) - record.store('user_profile_image_url', status.user.profile_image_url) - record.store('user_time_zone', status.user.time_zone) - record.store('user_lang', status.user.lang) - Engine.emit(@tag, Engine.now, record) + def hash_flatten(record, prefix = nil) + record.inject({}) do |d, (k, v)| + k = prefix.to_s + k.to_s + if v.is_a?(Hash) + d.merge(hash_flatten(v, k + @flatten_separator)) + else + d.merge(k => v) + end + end end end end