lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.5.4 vs lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.6.0

- old
+ new

@@ -1,5 +1,9 @@ +require 'twitter' +require 'nkf' +require 'string/scrub' if RUBY_VERSION.to_f < 2.1 + require "fluent/input" module Fluent class TwitterInput < Fluent::Input TIMELINE_TYPE = %w(userstream sampling location tracking) @@ -11,12 +15,12 @@ define_method("router") { Fluent::Engine } end config_param :consumer_key, :string, :secret => true config_param :consumer_secret, :string, :secret => true - config_param :oauth_token, :string, :secret => true - config_param :oauth_token_secret, :string, :secret => true + config_param :access_token, :string, :secret => true + config_param :access_token_secret, :string, :secret => true config_param :tag, :string config_param :timeline, :string config_param :keyword, :string, :default => nil config_param :follow_ids, :string, :default => nil config_param :locations, :string, :default => nil @@ -24,13 +28,10 @@ config_param :output_format, :string, :default => 'simple' config_param :flatten_separator, :string, :default => '_' def initialize super - require 'tweetstream' - require 'nkf' - require 'string/scrub' if RUBY_VERSION.to_f < 2.1 end def configure(conf) super @@ -41,89 +42,79 @@ raise Fluent::ConfigError, "output_format value undefined #{@output_format}" end @keyword = @keyword.gsub('${hashtag}', '#') unless @keyword.nil? - TweetStream.configure do |config| + @client = Twitter::Streaming::Client.new do |config| config.consumer_key = @consumer_key config.consumer_secret = @consumer_secret - config.oauth_token = @oauth_token - config.oauth_token_secret = @oauth_token_secret - config.auth_method = :oauth + config.access_token = @access_token + config.access_token_secret = @access_token_secret end end def start @thread = Thread.new(&method(:run)) - @any = Proc.new do |hash| - get_message(hash) if is_message?(hash) - end end def shutdown Thread.kill(@thread) end def run - client = get_twitter_connection + notice = "twitter: starting Twitter Streaming API for #{@timeline}." + notice << " tag:#{@tag}" + notice << " lang:#{@lang}" unless @lang.nil? + notice << " keyword:#{@keyword}" unless @keyword.nil? + notice << " follow:#{@follow_ids}" unless @follow_ids.nil? && !@keyword.nil? + $log.info notice + if ['sampling', 'tracking'].include?(@timeline) && @keyword - client.track(@keyword) + @client.filter(track: @keyword, &method(:handle_object)) elsif @timeline == 'tracking' && @follow_ids - client.follow(@follow_ids) - elsif @timeline == 'location' && @locations - client.locations(@locations) + @client.filter(follow: @follow_ids, &method(:handle_object)) elsif @timeline == 'sampling' && @keyword.nil? && @follow_ids.nil? - client.sample + @client.sample(&method(:handle_object)) elsif @timeline == 'userstream' - client.userstream + @client.user(&method(:handle_object)) end end - def get_twitter_connection - notice = "twitter: starting Twitter Streaming API for #{@timeline}." - notice << " tag:#{@tag}" - notice << " lang:#{@lang}" unless @lang.nil? - notice << " keyword:#{@keyword}" unless @keyword.nil? - notice << " follow:#{@follow_ids}" unless @follow_ids.nil? && !@keyword.nil? - $log.info notice - client = TweetStream::Client.new - client.on_anything(&@any) - client.on_error do |message| - $log.info "twitter: unexpected error has occured. #{message}" + def handle_object(object) + if is_message?(object) + get_message(object) end - return client end - def is_message?(status) - return false if !status.include?(:text) - return false if !status.include?(:user) - return false if (!@lang.nil? && @lang != '') && !@lang.include?(status[:user][:lang]) + def is_message?(tweet) + return false if !tweet.is_a?(Twitter::Tweet) + return false if (!@lang.nil? && @lang != '') && !@lang.include?(tweet.user.lang) if @timeline == 'userstream' && (!@keyword.nil? && @keyword != '') pattern = NKF::nkf('-WwZ1', @keyword).gsub(/,\s?/, '|') - tweet = NKF::nkf('-WwZ1', status[:text]) + tweet = NKF::nkf('-WwZ1', tweet.text) return false if !Regexp.new(pattern, Regexp::IGNORECASE).match(tweet) end return true end - def get_message(status) + def get_message(tweet) case @output_format when 'nest' - record = hash_key_to_s(status) + record = hash_key_to_s(tweet.to_h) when 'flat' - record = hash_flatten(status) + record = hash_flatten(tweet.to_h) when 'simple' record = Hash.new - record.store('message', status[:text]).scrub('') - record.store('coordinates', status[:coordinates]) - record.store('place', status[:place]) - record.store('created_at', status[:created_at]) - record.store('user_name', status[:user][:name]) - record.store('user_screen_name', status[:user][:screen_name]) - record.store('user_profile_image_url', status[:user][:profile_image_url]) - record.store('user_time_zone', status[:user][:time_zone]) - record.store('user_lang', status[:user][:lang]) + record.store('message', tweet.text).scrub('') + record.store('geo', tweet.geo) + record.store('place', tweet.place) + record.store('created_at', tweet.created_at) + record.store('user_name', tweet.user.name) + record.store('user_screen_name', tweet.user.screen_name) + record.store('user_profile_image_url', tweet.user.profile_image_url) + record.store('user_time_zone', tweet.user.time_zone) + record.store('user_lang', tweet.user.lang) end router.emit(@tag, Engine.now, record) end def hash_flatten(record, prefix = nil) @@ -166,7 +157,16 @@ else v end end end + end +end + +# TODO: Remove this monkey patch after release new version of twitter gem +# +# See: https://github.com/sferik/twitter/pull/815 +class Twitter::NullObject + def to_json(*args) + nil.to_json(*args) end end