require 'uri' require 'cgi' require 'eventmachine' require 'twitter/json_stream' require 'multi_json' module TweetStream # Provides simple access to the Twitter Streaming API (http://apiwiki.twitter.com/Streaming-API-Documentation) # for Ruby scripts that need to create a long connection to # Twitter for tracking and other purposes. # # Basic usage of the library is to call one of the provided # methods and provide a block that will perform actions on # a yielded TweetStream::Status. For example: # # TweetStream::Client.new.track('fail') do |status| # puts "[#{status.user.screen_name}] #{status.text}" # end # # For information about a daemonized TweetStream client, # view the TweetStream::Daemon class. class Client # @private attr_accessor *Configuration::VALID_OPTIONS_KEYS attr_accessor :timer # Creates a new API def initialize(options={}) options = TweetStream.options.merge(options) Configuration::VALID_OPTIONS_KEYS.each do |key| send("#{key}=", options[key]) end end # Get the JSON parser class for this client. def json_parser parser_from(parser) end # Returns all public statuses. The Firehose is not a generally # available resource. Few applications require this level of access. # Creative use of a combination of other resources and various access # levels can satisfy nearly every application use case. def firehose(query_parameters = {}, &block) start('statuses/firehose', query_parameters, &block) end # Returns all retweets. The retweet stream is not a generally available # resource. Few applications require this level of access. Creative # use of a combination of other resources and various access levels # can satisfy nearly every application use case. As of 9/11/2009, # the site-wide retweet feature has not yet launched, # so there are currently few, if any, retweets on this stream. def retweet(query_parameters = {}, &block) start('statuses/retweet', query_parameters, &block) end # Returns a random sample of all public statuses. The default access level # provides a small proportion of the Firehose. The "Gardenhose" access # level provides a proportion more suitable for data mining and # research applications that desire a larger proportion to be statistically # significant sample. def sample(query_parameters = {}, &block) start('statuses/sample', query_parameters, &block) end # Specify keywords to track. Queries are subject to Track Limitations, # described in Track Limiting and subject to access roles, described in # the statuses/filter method. Track keywords are case-insensitive logical # ORs. Terms are exact-matched, and also exact-matched ignoring # punctuation. Phrases, keywords with spaces, are not supported. # Keywords containing punctuation will only exact match tokens. # Query parameters may be passed as the last argument. def track(*keywords, &block) query_params = keywords.pop if keywords.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:track => keywords), &block) end # Returns public statuses from or in reply to a set of users. Mentions # ("Hello @user!") and implicit replies ("@user Hello!" created without # pressing the reply "swoosh") are not matched. Requires integer user # IDs, not screen names. Query parameters may be passed as the last argument. def follow(*user_ids, &block) query_params = user_ids.pop if user_ids.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:follow => user_ids), &block) end # Specifies a set of bounding boxes to track. Only tweets that are both created # using the Geotagging API and are placed from within a tracked bounding box will # be included in the stream – the user’s location field is not used to filter tweets # (e.g. if a user has their location set to “San Francisco”, but the tweet was not created # using the Geotagging API and has no geo element, it will not be included in the stream). # Bounding boxes are specified as a comma separate list of longitude/latitude pairs, with # the first pair denoting the southwest corner of the box # longitude/latitude pairs, separated by commas. The first pair specifies the southwest corner of the box. def locations(*locations_map, &block) query_params = locations_map.pop if locations_map.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:locations => locations_map), &block) end # Make a call to the statuses/filter method of the Streaming API, # you may provide :follow, :track or both as options # to follow the tweets of specified users or track keywords. This # method is provided separately for cases when it would conserve the # number of HTTP connections to combine track and follow. def filter(query_params = {}, &block) start('statuses/filter', query_params.merge(:method => :post), &block) end # Make a call to the userstream api for currently authenticated user def userstream(&block) start('', :extra_stream_parameters => {:host => "userstream.twitter.com", :path => "/2/user.json"}, &block) end # Set a Proc to be run when a deletion notice is received # from the Twitter stream. For example: # # @client = TweetStream::Client.new # @client.on_delete do |status_id, user_id| # Tweet.delete(status_id) # end # # Block must take two arguments: the status id and the user id. # If no block is given, it will return the currently set # deletion proc. When a block is given, the TweetStream::Client # object is returned to allow for chaining. def on_delete(&block) if block_given? @on_delete = block self else @on_delete end end # Set a Proc to be run when a rate limit notice is received # from the Twitter stream. For example: # # @client = TweetStream::Client.new # @client.on_limit do |discarded_count| # # Make note of discarded count # end # # Block must take one argument: the number of discarded tweets. # If no block is given, it will return the currently set # limit proc. When a block is given, the TweetStream::Client # object is returned to allow for chaining. def on_limit(&block) if block_given? @on_limit = block self else @on_limit end end # Set a Proc to be run when an HTTP error is encountered in the # processing of the stream. Note that TweetStream will automatically # try to reconnect, this is for reference only. Don't panic! # # @client = TweetStream::Client.new # @client.on_error do |message| # # Make note of error message # end # # Block must take one argument: the error message. # If no block is given, it will return the currently set # error proc. When a block is given, the TweetStream::Client # object is returned to allow for chaining. def on_error(&block) if block_given? @on_error = block self else @on_error end end # Set a Proc to be run when a direct message is encountered in the # processing of the stream. # # @client = TweetStream::Client.new # @client.on_direct_message do |direct_message| # # do something with the direct message # end # # Block must take one argument: the direct message. # If no block is given, it will return the currently set # direct message proc. When a block is given, the TweetStream::Client # object is returned to allow for chaining. def on_direct_message(&block) if block_given? @on_direct_message = block self else @on_direct_message end end # Set a Proc to be run whenever anything is encountered in the # processing of the stream. # # @client = TweetStream::Client.new # @client.on_anything do |status| # # do something with the status # end # # Block can take one or two arguments. |status (, client)| # If no block is given, it will return the currently set # timeline status proc. When a block is given, the TweetStream::Client # object is returned to allow for chaining. def on_anything(&block) if block_given? @on_anything = block self else @on_anything end end # Set a Proc to be run when a regular timeline message is encountered in the # processing of the stream. # # @client = TweetStream::Client.new # @client.on_timeline_message do |status| # # do something with the status # end # # Block can take one or two arguments. |status (, client)| # If no block is given, it will return the currently set # timeline status proc. When a block is given, the TweetStream::Client # object is returned to allow for chaining. def on_timeline_status(&block) if block_given? @on_timeline_status = block self else @on_timeline_status end end # Set a Proc to be run when connection established. # Called in EventMachine::Connection#post_init # # @client = TweetStream::Client.new # @client.on_inited do # puts 'Connected...' # end # def on_inited(&block) if block_given? @on_inited = block self else @on_inited end end # Set a Proc to be run on a regular interval # independent of timeline status updates # # @client = TweetStream::Client.new # @client.on_interval(20) do # # do something every 20 seconds # end # def on_interval(time_interval=nil, &block) if block_given? @on_interval_time = time_interval @on_interval_proc = block self else [@on_interval_time, @on_interval_proc] end end def start(path, query_parameters = {}, &block) #:nodoc: method = query_parameters.delete(:method) || :get delete_proc = query_parameters.delete(:delete) || self.on_delete limit_proc = query_parameters.delete(:limit) || self.on_limit error_proc = query_parameters.delete(:error) || self.on_error inited_proc = query_parameters.delete(:inited) || self.on_inited direct_message_proc = query_parameters.delete(:direct_message) || self.on_direct_message timeline_status_proc = query_parameters.delete(:timeline_status) || self.on_timeline_status anything_proc = query_parameters.delete(:anything) || self.on_anything params = normalize_filter_parameters(query_parameters) extra_stream_parameters = query_parameters.delete(:extra_stream_parameters) || {} uri = method == :get ? build_uri(path, params) : build_uri(path) stream_params = { :path => uri, :method => method.to_s.upcase, :user_agent => user_agent, :on_inited => inited_proc, :filters => params.delete(:track), :params => params, :ssl => true }.merge(auth_params).merge(extra_stream_parameters) EventMachine.epoll EventMachine.kqueue EventMachine::run { if @on_interval_proc.is_a?(Proc) interval = @on_interval_time || Configuration::DEFAULT_TIMER_INTERVAL @timer = EventMachine.add_periodic_timer(interval) do EventMachine.defer do @on_interval_proc.call end end end @stream = Twitter::JSONStream.connect(stream_params) @stream.each_item do |item| begin raw_hash = json_parser.decode(item) rescue MultiJson::DecodeError error_proc.call("MultiJson::DecodeError occured in stream: #{item}") if error_proc.is_a?(Proc) next end unless raw_hash.is_a?(::Hash) error_proc.call("Unexpected JSON object in stream: #{item}") if error_proc.is_a?(Proc) next end hash = TweetStream::Hash.new(raw_hash) if hash[:delete] && hash[:delete][:status] delete_proc.call(hash[:delete][:status][:id], hash[:delete][:status][:user_id]) if delete_proc.is_a?(Proc) elsif hash[:limit] && hash[:limit][:track] limit_proc.call(hash[:limit][:track]) if limit_proc.is_a?(Proc) elsif hash[:direct_message] yield_message_to direct_message_proc, TweetStream::DirectMessage.new(hash[:direct_message]) elsif hash[:text] && hash[:user] @last_status = TweetStream::Status.new(hash) yield_message_to timeline_status_proc, @last_status if block_given? # Give the block the option to receive either one # or two arguments, depending on its arity. case block.arity when 1 yield @last_status when 2 yield @last_status, self end end end yield_message_to anything_proc, hash end @stream.on_error do |message| error_proc.call(message) if error_proc.is_a?(Proc) end @stream.on_max_reconnects do |timeout, retries| raise TweetStream::ReconnectError.new(timeout, retries) end } end # Terminate the currently running TweetStream. def stop EventMachine.stop_event_loop @last_status end protected def parser_from(parser) MultiJson.engine = parser MultiJson end def build_uri(path, query_parameters = {}) #:nodoc: URI.parse("/1/#{path}.json#{build_query_parameters(query_parameters)}") end def build_query_parameters(query) query.size > 0 ? "?#{build_post_body(query)}" : '' end def build_post_body(query) #:nodoc: return '' unless query && query.is_a?(::Hash) && query.size > 0 query.map do |k, v| v = v.flatten.collect { |q| q.to_s }.join(',') if v.is_a?(Array) "#{k.to_s}=#{CGI.escape(v.to_s)}" end.join('&') end def normalize_filter_parameters(query_parameters = {}) [:follow, :track, :locations].each do |param| if query_parameters[param].kind_of?(Array) query_parameters[param] = query_parameters[param].flatten.collect{|q| q.to_s}.join(',') elsif query_parameters[param] query_parameters[param] = query_parameters[param].to_s end end query_parameters end def auth_params case auth_method when :basic return :auth => "#{username}:#{password}" when :oauth return :oauth => { :consumer_key => consumer_key, :consumer_secret => consumer_secret, :access_key => oauth_token, :access_secret => oauth_token_secret } end end def yield_message_to(procedure, message) if procedure.is_a?(Proc) case procedure.arity when 1 procedure.call(message) when 2 procedure.call(message, self) end end end end end