lib/weeter/twitter/tweet_consumer.rb in weeter-0.10.0 vs lib/weeter/twitter/tweet_consumer.rb in weeter-0.11.0

- old
+ new

@@ -5,31 +5,41 @@ module Twitter class TweetConsumer attr_reader :limiter - def initialize(twitter_config, notifier, limiter) + def initialize(twitter_config, notifier, limiter, subscriptions_limit = nil) @config = twitter_config @notifier = notifier @limiter = limiter + @subscriptions_limit = subscriptions_limit end def connect(filter_params) + filter_params = limit_filter_params(filter_params) if @subscriptions_limit filter_params = clean_filter_params(filter_params) - connect_options = {:ssl => true, :params => filter_params, :method => 'POST'}.merge(@config.auth_options) + + connect_options = { + ssl: true, + params: filter_params, + method: 'POST' + }.merge(@config.auth_options) + @stream = ::Twitter::JSONStream.connect(connect_options) @stream.each_item do |item| begin tweet_item = TweetItem.new(MultiJson.decode(item)) - if limiter.limit?(*tweet_item.limiting_facets) - rate_limit_tweet(tweet_item) - elsif tweet_item.deletion? + if tweet_item.deletion? @notifier.delete_tweet(tweet_item) elsif tweet_item.publishable? - @notifier.publish_tweet(tweet_item) + if limiter.limit?(*tweet_item.limiting_facets) + rate_limit_tweet(tweet_item) + else + @notifier.publish_tweet(tweet_item) + end else ignore_tweet(tweet_item) end rescue => ex Weeter.logger.error("Twitter stream tweet exception: #{ex.class.name}: #{ex.message}") @@ -49,9 +59,40 @@ @stream.stop connect(filter_params) end protected + + def limit_filter_params(params) + result = params.clone + result.default = [] + + follow_count = result['follow'].length + track_count = result['track'].length + + total = follow_count + track_count + + if total > @subscriptions_limit + Weeter.logger.error("Twitter Subscriptions are #{total}, but limited to #{@subscriptions_limit} subscriptions") + end + + while total > 1000 + if follow_count > track_count + follow_count -= 1 + elsif track_count > follow_count + track_count -= 1 + else + track_count -= 1 + end + + total = follow_count + track_count + end + + result['track'] = result['track'][0...track_count] + result['follow'] = result['follow'][0...follow_count] + + result + end def clean_filter_params(p) return {} if p.nil? cleaned_params = {} cleaned_params['follow'] = p['follow'] if (p['follow'] || []).any?