require "http/request" require "twitter/arguments" require "twitter/client" require "twitter/headers" require "twitter/streaming/connection" require "twitter/streaming/message_parser" require "twitter/streaming/response" require "twitter/utils" module Twitter module Streaming class Client < Twitter::Client include Twitter::Utils attr_writer :connection # Initializes a new Client object # # @param options [Hash] A customizable set of options. # @option options [String] :tcp_socket_class A class that Connection will use to create a new TCP socket. # @option options [String] :ssl_socket_class A class that Connection will use to create a new SSL socket. # @return [Twitter::Streaming::Client] def initialize(options = {}) super @connection = Streaming::Connection.new(options) end # Returns public statuses that match one or more filter predicates # # @see https://dev.twitter.com/streaming/reference/post/statuses/filter # @see https://dev.twitter.com/streaming/overview/request-parameters # @note At least one predicate parameter (follow, locations, or track) must be specified. # @param options [Hash] A customizable set of options. # @option options [String] :follow A comma separated list of user IDs, indicating the users to return statuses for in the stream. # @option options [String] :track Includes additional Tweets matching the specified keywords. Phrases of keywords are specified by a comma-separated list. # @option options [String] :locations Includes additional Tweets falling within the specified bounding boxes. # @yield [Twitter::Tweet, Twitter::Streaming::Event, Twitter::DirectMessage, Twitter::Streaming::FriendList, Twitter::Streaming::DeletedTweet, Twitter::Streaming::StallWarning] A stream of Twitter objects. def filter(options = {}, &) request(:post, "https://stream.twitter.com:443/1.1/statuses/filter.json", options, &) end # Returns all public statuses # # @see https://dev.twitter.com/streaming/reference/get/statuses/firehose # @see https://dev.twitter.com/streaming/overview/request-parameters # @note This endpoint requires special permission to access. # @param options [Hash] A customizable set of options. # @option options [Integer] :count The number of messages to backfill. # @yield [Twitter::Tweet, Twitter::Streaming::Event, Twitter::DirectMessage, Twitter::Streaming::FriendList, Twitter::Streaming::DeletedTweet, Twitter::Streaming::StallWarning] A stream of Twitter objects. def firehose(options = {}, &) request(:get, "https://stream.twitter.com:443/1.1/statuses/firehose.json", options, &) end # Returns a small random sample of all public statuses # # @see https://dev.twitter.com/streaming/reference/get/statuses/sample # @see https://dev.twitter.com/streaming/overview/request-parameters # @yield [Twitter::Tweet, Twitter::Streaming::Event, Twitter::DirectMessage, Twitter::Streaming::FriendList, Twitter::Streaming::DeletedTweet, Twitter::Streaming::StallWarning] A stream of Twitter objects. def sample(options = {}, &) request(:get, "https://stream.twitter.com:443/1.1/statuses/sample.json", options, &) end # Streams messages for a set of users # # @see https://dev.twitter.com/streaming/reference/get/site # @see https://dev.twitter.com/streaming/sitestreams # @see https://dev.twitter.com/streaming/overview/request-parameters # @note Site Streams is currently in a limited beta. Access is restricted to whitelisted accounts. # @overload site(*follow, options = {}, &block) # @param follow [Enumerable] A list of user IDs, indicating the users to return statuses for in the stream. # @param options [Hash] A customizable set of options. # @option options [String] :with Specifies whether to return information for just the users specified in the follow parameter, or include messages from accounts they follow. # @option options [String] :replies Specifies whether stall warnings should be delivered. # @yield [Twitter::Tweet, Twitter::Streaming::Event, Twitter::DirectMessage, Twitter::Streaming::FriendList, Twitter::Streaming::DeletedTweet, Twitter::Streaming::StallWarning] A stream of Twitter objects. def site(*args, &) arguments = Arguments.new(args) user_ids = collect_user_ids(arguments) request(:get, "https://sitestream.twitter.com:443/1.1/site.json", arguments.options.merge(follow: user_ids.join(",")), &) end # Streams messages for a single user # # @see https://dev.twitter.com/streaming/reference/get/user # @see https://dev.twitter.com/streaming/userstreams # @see https://dev.twitter.com/streaming/overview/request-parameters # @param options [Hash] A customizable set of options. # @option options [String] :with Specifies whether to return information for just the users specified in the follow parameter, or include messages from accounts they follow. # @option options [String] :replies Specifies whether to return additional @replies. # @option options [String] :stall_warnings Specifies whether stall warnings should be delivered. # @option options [String] :track Includes additional Tweets matching the specified keywords. Phrases of keywords are specified by a comma-separated list. # @option options [String] :locations Includes additional Tweets falling within the specified bounding boxes. # @yield [Twitter::Tweet, Twitter::Streaming::Event, Twitter::DirectMessage, Twitter::Streaming::FriendList, Twitter::Streaming::DeletedTweet, Twitter::Streaming::StallWarning] A stream of Twitter objects. def user(options = {}, &) request(:get, "https://userstream.twitter.com:443/1.1/user.json", options, &) end # Set a Proc to be run when connection established. def before_request(&block) if block @before_request = block self elsif instance_variable_defined?(:@before_request) @before_request else proc {} end end def close @connection.close end private def request(method, uri, params) before_request.call headers = Twitter::Headers.new(self, method, uri, params).request_headers request = HTTP::Request.new(verb: method, uri: "#{uri}?#{to_url_params(params)}", headers:, proxy:) response = Streaming::Response.new do |data| if item = Streaming::MessageParser.parse(data) # rubocop:disable Lint/AssignmentInCondition yield(item) end end @connection.stream(request, response) end def to_url_params(params) uri = Addressable::URI.new uri.query_values = params uri.query end # Takes a mixed array of Integers and Twitter::User objects and returns a # consistent array of Twitter user IDs. # # @param users [Array] # @return [Array] def collect_user_ids(users) users.filter_map do |user| case user when Integer then user when Twitter::User then user.id end end end end end end