lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.5.4 vs lib/fluent/plugin/in_twitter.rb in fluent-plugin-twitter-0.6.0
- old
+ new
@@ -1,5 +1,9 @@
+require 'twitter'
+require 'nkf'
+require 'string/scrub' if RUBY_VERSION.to_f < 2.1
+
require "fluent/input"
module Fluent
class TwitterInput < Fluent::Input
TIMELINE_TYPE = %w(userstream sampling location tracking)
@@ -11,12 +15,12 @@
define_method("router") { Fluent::Engine }
end
config_param :consumer_key, :string, :secret => true
config_param :consumer_secret, :string, :secret => true
- config_param :oauth_token, :string, :secret => true
- config_param :oauth_token_secret, :string, :secret => true
+ config_param :access_token, :string, :secret => true
+ config_param :access_token_secret, :string, :secret => true
config_param :tag, :string
config_param :timeline, :string
config_param :keyword, :string, :default => nil
config_param :follow_ids, :string, :default => nil
config_param :locations, :string, :default => nil
@@ -24,13 +28,10 @@
config_param :output_format, :string, :default => 'simple'
config_param :flatten_separator, :string, :default => '_'
def initialize
super
- require 'tweetstream'
- require 'nkf'
- require 'string/scrub' if RUBY_VERSION.to_f < 2.1
end
def configure(conf)
super
@@ -41,89 +42,79 @@
raise Fluent::ConfigError, "output_format value undefined #{@output_format}"
end
@keyword = @keyword.gsub('${hashtag}', '#') unless @keyword.nil?
- TweetStream.configure do |config|
+ @client = Twitter::Streaming::Client.new do |config|
config.consumer_key = @consumer_key
config.consumer_secret = @consumer_secret
- config.oauth_token = @oauth_token
- config.oauth_token_secret = @oauth_token_secret
- config.auth_method = :oauth
+ config.access_token = @access_token
+ config.access_token_secret = @access_token_secret
end
end
def start
@thread = Thread.new(&method(:run))
- @any = Proc.new do |hash|
- get_message(hash) if is_message?(hash)
- end
end
def shutdown
Thread.kill(@thread)
end
def run
- client = get_twitter_connection
+ notice = "twitter: starting Twitter Streaming API for #{@timeline}."
+ notice << " tag:#{@tag}"
+ notice << " lang:#{@lang}" unless @lang.nil?
+ notice << " keyword:#{@keyword}" unless @keyword.nil?
+ notice << " follow:#{@follow_ids}" unless @follow_ids.nil? && !@keyword.nil?
+ $log.info notice
+
if ['sampling', 'tracking'].include?(@timeline) && @keyword
- client.track(@keyword)
+ @client.filter(track: @keyword, &method(:handle_object))
elsif @timeline == 'tracking' && @follow_ids
- client.follow(@follow_ids)
- elsif @timeline == 'location' && @locations
- client.locations(@locations)
+ @client.filter(follow: @follow_ids, &method(:handle_object))
elsif @timeline == 'sampling' && @keyword.nil? && @follow_ids.nil?
- client.sample
+ @client.sample(&method(:handle_object))
elsif @timeline == 'userstream'
- client.userstream
+ @client.user(&method(:handle_object))
end
end
- def get_twitter_connection
- notice = "twitter: starting Twitter Streaming API for #{@timeline}."
- notice << " tag:#{@tag}"
- notice << " lang:#{@lang}" unless @lang.nil?
- notice << " keyword:#{@keyword}" unless @keyword.nil?
- notice << " follow:#{@follow_ids}" unless @follow_ids.nil? && !@keyword.nil?
- $log.info notice
- client = TweetStream::Client.new
- client.on_anything(&@any)
- client.on_error do |message|
- $log.info "twitter: unexpected error has occured. #{message}"
+ def handle_object(object)
+ if is_message?(object)
+ get_message(object)
end
- return client
end
- def is_message?(status)
- return false if !status.include?(:text)
- return false if !status.include?(:user)
- return false if (!@lang.nil? && @lang != '') && !@lang.include?(status[:user][:lang])
+ def is_message?(tweet)
+ return false if !tweet.is_a?(Twitter::Tweet)
+ return false if (!@lang.nil? && @lang != '') && !@lang.include?(tweet.user.lang)
if @timeline == 'userstream' && (!@keyword.nil? && @keyword != '')
pattern = NKF::nkf('-WwZ1', @keyword).gsub(/,\s?/, '|')
- tweet = NKF::nkf('-WwZ1', status[:text])
+ tweet = NKF::nkf('-WwZ1', tweet.text)
return false if !Regexp.new(pattern, Regexp::IGNORECASE).match(tweet)
end
return true
end
- def get_message(status)
+ def get_message(tweet)
case @output_format
when 'nest'
- record = hash_key_to_s(status)
+ record = hash_key_to_s(tweet.to_h)
when 'flat'
- record = hash_flatten(status)
+ record = hash_flatten(tweet.to_h)
when 'simple'
record = Hash.new
- record.store('message', status[:text]).scrub('')
- record.store('coordinates', status[:coordinates])
- record.store('place', status[:place])
- record.store('created_at', status[:created_at])
- record.store('user_name', status[:user][:name])
- record.store('user_screen_name', status[:user][:screen_name])
- record.store('user_profile_image_url', status[:user][:profile_image_url])
- record.store('user_time_zone', status[:user][:time_zone])
- record.store('user_lang', status[:user][:lang])
+ record.store('message', tweet.text).scrub('')
+ record.store('geo', tweet.geo)
+ record.store('place', tweet.place)
+ record.store('created_at', tweet.created_at)
+ record.store('user_name', tweet.user.name)
+ record.store('user_screen_name', tweet.user.screen_name)
+ record.store('user_profile_image_url', tweet.user.profile_image_url)
+ record.store('user_time_zone', tweet.user.time_zone)
+ record.store('user_lang', tweet.user.lang)
end
router.emit(@tag, Engine.now, record)
end
def hash_flatten(record, prefix = nil)
@@ -166,7 +157,16 @@
else
v
end
end
end
+ end
+end
+
+# TODO: Remove this monkey patch after release new version of twitter gem
+#
+# See: https://github.com/sferik/twitter/pull/815
+class Twitter::NullObject
+ def to_json(*args)
+ nil.to_json(*args)
end
end