lib/weeter/twitter/tweet_consumer.rb in weeter-0.15.0 vs lib/weeter/twitter/tweet_consumer.rb in weeter-0.17.0

- old
+ new

@@ -1,6 +1,6 @@ -require 'twitter/json_stream' +require 'em-twitter' require 'multi_json' module Weeter module Twitter class TweetConsumer @@ -21,22 +21,26 @@ end def connect(filter_params) filter_params = limit_filter_params(filter_params) filter_params = clean_filter_params(filter_params) + oauth_options = @config.auth_options[:oauth] + options = { + :path => '/1.1/statuses/filter.json', + :params => filter_params, + :oauth => { + :consumer_key => oauth_options[:consumer_key], + :consumer_secret => oauth_options[:consumer_secret], + :token => oauth_options[:access_key], + :token_secret => oauth_options[:access_secret] + } + } - connect_options = { - ssl: true, - params: filter_params, - method: 'POST' - }.merge(@config.auth_options) - Weeter.logger.info("Connecting to Twitter stream...") - @stream = ::Twitter::JSONStream.connect(connect_options) - - @stream.each_item do |item| + @client = EM::Twitter::Client.connect(options) + @client.each do |item| begin tweet_item = TweetItem.new(MultiJson.decode(item)) if tweet_item.limit_notice? notify_missed_tweets(tweet_item) @@ -50,21 +54,53 @@ rescue => ex Weeter.logger.error("Twitter stream tweet exception: #{ex.class.name}: #{ex.message} #{tweet_item.to_json}") end end - @stream.on_error do |msg| + @client.on_unauthorized do |msg| + Weeter.logger.debug("on_unauthorized: #{msg}") + end + + @client.on_forbidden do |msg| + Weeter.logger.debug("on_forbidden: #{msg}") + end + + @client.on_not_found do |msg| + Weeter.logger.debug("on_not_found: #{msg}") + end + + @client.on_not_acceptable do |msg| + Weeter.logger.debug("on_not_acceptable: #{msg}") + end + + @client.on_too_long do |msg| + Weeter.logger.debug("on_too_long: #{msg}") + end + + @client.on_range_unacceptable do |msg| + Weeter.logger.debug("on_range_unacceptable: #{msg}") + end + + @client.on_enhance_your_calm do |msg| # rate-limited + Weeter.logger.debug("on_enhance_your_calm: #{msg}") + end + + @client.on_error do |msg| Weeter.logger.error("Twitter stream error: #{msg}. Connect options were #{connect_options.inspect}") end - @stream.on_max_reconnects do |timeout, retries| + @client.on_reconnect do |msg| + Weeter.logger.debug("on_reconnect: #{msg}") + end + + @client.on_max_reconnects do |timeout, retries| Weeter.logger.error("Twitter stream max-reconnects reached: timeout=#{timeout}, retries=#{retries}") end end def reconnect(filter_params) - @stream.stop - @stream.unbind + @client.stop + @client.unbind connect(filter_params) end protected