lib/weeter/twitter/tweet_consumer.rb in weeter-0.10.0 vs lib/weeter/twitter/tweet_consumer.rb in weeter-0.11.0
- old
+ new
@@ -5,31 +5,41 @@
module Twitter
class TweetConsumer
attr_reader :limiter
- def initialize(twitter_config, notifier, limiter)
+ def initialize(twitter_config, notifier, limiter, subscriptions_limit = nil)
@config = twitter_config
@notifier = notifier
@limiter = limiter
+ @subscriptions_limit = subscriptions_limit
end
def connect(filter_params)
+ filter_params = limit_filter_params(filter_params) if @subscriptions_limit
filter_params = clean_filter_params(filter_params)
- connect_options = {:ssl => true, :params => filter_params, :method => 'POST'}.merge(@config.auth_options)
+
+ connect_options = {
+ ssl: true,
+ params: filter_params,
+ method: 'POST'
+ }.merge(@config.auth_options)
+
@stream = ::Twitter::JSONStream.connect(connect_options)
@stream.each_item do |item|
begin
tweet_item = TweetItem.new(MultiJson.decode(item))
- if limiter.limit?(*tweet_item.limiting_facets)
- rate_limit_tweet(tweet_item)
- elsif tweet_item.deletion?
+ if tweet_item.deletion?
@notifier.delete_tweet(tweet_item)
elsif tweet_item.publishable?
- @notifier.publish_tweet(tweet_item)
+ if limiter.limit?(*tweet_item.limiting_facets)
+ rate_limit_tweet(tweet_item)
+ else
+ @notifier.publish_tweet(tweet_item)
+ end
else
ignore_tweet(tweet_item)
end
rescue => ex
Weeter.logger.error("Twitter stream tweet exception: #{ex.class.name}: #{ex.message}")
@@ -49,9 +59,40 @@
@stream.stop
connect(filter_params)
end
protected
+
+ def limit_filter_params(params)
+ result = params.clone
+ result.default = []
+
+ follow_count = result['follow'].length
+ track_count = result['track'].length
+
+ total = follow_count + track_count
+
+ if total > @subscriptions_limit
+ Weeter.logger.error("Twitter Subscriptions are #{total}, but limited to #{@subscriptions_limit} subscriptions")
+ end
+
+ while total > 1000
+ if follow_count > track_count
+ follow_count -= 1
+ elsif track_count > follow_count
+ track_count -= 1
+ else
+ track_count -= 1
+ end
+
+ total = follow_count + track_count
+ end
+
+ result['track'] = result['track'][0...track_count]
+ result['follow'] = result['follow'][0...follow_count]
+
+ result
+ end
def clean_filter_params(p)
return {} if p.nil?
cleaned_params = {}
cleaned_params['follow'] = p['follow'] if (p['follow'] || []).any?