lib/weeter/twitter/tweet_consumer.rb in weeter-0.11.0 vs lib/weeter/twitter/tweet_consumer.rb in weeter-0.13.0

- old
+ new

@@ -2,12 +2,17 @@ require 'multi_json' module Weeter module Twitter class TweetConsumer + extend ::Forwardable - attr_reader :limiter + attr_reader :limiter, :notifier + def_delegators :@notifier, :notify_missed_tweets, + :notify_rate_limiting_initiated, + :delete_tweet, + :publish_tweet def initialize(twitter_config, notifier, limiter, subscriptions_limit = nil) @config = twitter_config @notifier = notifier @limiter = limiter @@ -16,35 +21,35 @@ def connect(filter_params) filter_params = limit_filter_params(filter_params) if @subscriptions_limit filter_params = clean_filter_params(filter_params) + connect_options = { ssl: true, params: filter_params, method: 'POST' }.merge(@config.auth_options) + Weeter.logger.info("Connecting to Twitter stream...") @stream = ::Twitter::JSONStream.connect(connect_options) @stream.each_item do |item| begin tweet_item = TweetItem.new(MultiJson.decode(item)) - if tweet_item.deletion? - @notifier.delete_tweet(tweet_item) + if tweet_item.limit_notice? + notify_missed_tweets(tweet_item) + elsif tweet_item.deletion? + delete_tweet(tweet_item) elsif tweet_item.publishable? - if limiter.limit?(*tweet_item.limiting_facets) - rate_limit_tweet(tweet_item) - else - @notifier.publish_tweet(tweet_item) - end + publish_or_rate_limit(tweet_item) else ignore_tweet(tweet_item) end rescue => ex - Weeter.logger.error("Twitter stream tweet exception: #{ex.class.name}: #{ex.message}") + Weeter.logger.error("Twitter stream tweet exception: #{ex.class.name}: #{ex.message} #{tweet_item.to_json}") end end @stream.on_error do |msg| Weeter.logger.error("Twitter stream error: #{msg}. Connect options were #{connect_options.inspect}") @@ -55,10 +60,11 @@ end end def reconnect(filter_params) @stream.stop + @stream.unbind connect(filter_params) end protected @@ -95,27 +101,42 @@ def clean_filter_params(p) return {} if p.nil? cleaned_params = {} cleaned_params['follow'] = p['follow'] if (p['follow'] || []).any? - cleaned_params['follow'] = cleaned_params['follow'].map(&:to_i) + cleaned_params['follow'] = cleaned_params['follow'].map(&:to_i) if cleaned_params['follow'] cleaned_params['track'] = p['track'] if (p['track'] || []).any? cleaned_params end def ignore_tweet(tweet_item) + return if tweet_item.disconnect_notice? id = tweet_item['id_str'] text = tweet_item['text'] - user_id = tweet_item['user']['id_str'] + user = tweet_item['user'] + user_id = user['id_str'] if user Weeter.logger.info("Ignoring tweet #{id} from user #{user_id}: #{text}") end def rate_limit_tweet(tweet_item) id = tweet_item['id_str'] text = tweet_item['text'] user_id = tweet_item['user']['id_str'] Weeter.logger.info("Rate Limiting tweet #{id} from user #{user_id}: #{text}") + end + + def publish_or_rate_limit(tweet_item) + limit_result = limiter.process(*tweet_item.limiting_facets) + case limit_result.status + when Weeter::Limitator::INITIATE_LIMITING + notify_rate_limiting_initiated(tweet_item, limit_result.limited_keys) + rate_limit_tweet(tweet_item) + when Weeter::Limitator::CONTINUE_LIMITING + rate_limit_tweet(tweet_item) + when Weeter::Limitator::DO_NOT_LIMIT + publish_tweet(tweet_item) + end end end end end