lib/weeter/twitter/tweet_consumer.rb in weeter-0.15.0 vs lib/weeter/twitter/tweet_consumer.rb in weeter-0.17.0
- old
+ new
@@ -1,6 +1,6 @@
-require 'twitter/json_stream'
+require 'em-twitter'
require 'multi_json'
module Weeter
module Twitter
class TweetConsumer
@@ -21,22 +21,26 @@
end
def connect(filter_params)
filter_params = limit_filter_params(filter_params)
filter_params = clean_filter_params(filter_params)
+ oauth_options = @config.auth_options[:oauth]
+ options = {
+ :path => '/1.1/statuses/filter.json',
+ :params => filter_params,
+ :oauth => {
+ :consumer_key => oauth_options[:consumer_key],
+ :consumer_secret => oauth_options[:consumer_secret],
+ :token => oauth_options[:access_key],
+ :token_secret => oauth_options[:access_secret]
+ }
+ }
- connect_options = {
- ssl: true,
- params: filter_params,
- method: 'POST'
- }.merge(@config.auth_options)
-
Weeter.logger.info("Connecting to Twitter stream...")
- @stream = ::Twitter::JSONStream.connect(connect_options)
-
- @stream.each_item do |item|
+ @client = EM::Twitter::Client.connect(options)
+ @client.each do |item|
begin
tweet_item = TweetItem.new(MultiJson.decode(item))
if tweet_item.limit_notice?
notify_missed_tweets(tweet_item)
@@ -50,21 +54,53 @@
rescue => ex
Weeter.logger.error("Twitter stream tweet exception: #{ex.class.name}: #{ex.message} #{tweet_item.to_json}")
end
end
- @stream.on_error do |msg|
+ @client.on_unauthorized do |msg|
+ Weeter.logger.debug("on_unauthorized: #{msg}")
+ end
+
+ @client.on_forbidden do |msg|
+ Weeter.logger.debug("on_forbidden: #{msg}")
+ end
+
+ @client.on_not_found do |msg|
+ Weeter.logger.debug("on_not_found: #{msg}")
+ end
+
+ @client.on_not_acceptable do |msg|
+ Weeter.logger.debug("on_not_acceptable: #{msg}")
+ end
+
+ @client.on_too_long do |msg|
+ Weeter.logger.debug("on_too_long: #{msg}")
+ end
+
+ @client.on_range_unacceptable do |msg|
+ Weeter.logger.debug("on_range_unacceptable: #{msg}")
+ end
+
+ @client.on_enhance_your_calm do |msg| # rate-limited
+ Weeter.logger.debug("on_enhance_your_calm: #{msg}")
+ end
+
+ @client.on_error do |msg|
Weeter.logger.error("Twitter stream error: #{msg}. Connect options were #{connect_options.inspect}")
end
- @stream.on_max_reconnects do |timeout, retries|
+ @client.on_reconnect do |msg|
+ Weeter.logger.debug("on_reconnect: #{msg}")
+ end
+
+ @client.on_max_reconnects do |timeout, retries|
Weeter.logger.error("Twitter stream max-reconnects reached: timeout=#{timeout}, retries=#{retries}")
end
end
def reconnect(filter_params)
- @stream.stop
- @stream.unbind
+ @client.stop
+ @client.unbind
connect(filter_params)
end
protected