lib/datasift.rb in datasift-2.1.1 vs lib/datasift.rb in datasift-3.0.0.beta
- old
+ new
@@ -1,15 +1,288 @@
-#This is the base file for the DataSift API library. Require this file to get
-#access to the full library functionality.
-require 'rubygems'
-
dir = File.dirname(__FILE__)
-require dir + '/DataSift/exceptions'
-require dir + '/DataSift/apiclient'
-require dir + '/DataSift/user'
-require dir + '/DataSift/definition'
-require dir + '/DataSift/historic'
-require dir + '/DataSift/push_definition'
-require dir + '/DataSift/push_subscription'
-require dir + '/DataSift/stream_consumer'
-require dir + '/DataSift/stream_consumer_http'
-require dir + '/DataSift/managed_source'
+
+require 'uri'
+require 'rest_client'
+require 'multi_json'
+require 'websocket_td'
+
+require dir + '/api/api_resource'
+
+require dir + '/errors'
+require dir + '/push'
+require dir + '/historics'
+require dir + '/historics_preview'
+require dir + '/managed_source'
+require dir + '/live_stream'
+
+require 'rbconfig'
+
+module DataSift
+ #
+ IS_WINDOWS = (RbConfig::CONFIG['host_os'] =~ /mswin|mingw|cygwin/)
+ VERSION = File.open('../VERSION').first
+
+ class Client < ApiResource
+
+ #+config+:: A hash containing configuration options for the client for e.g.
+ # {:username => 'some_user', :api_key => 'ds_api_key', :enable_ssl => true, :open_timeout => 30, :timeout => 30}
+ def initialize (config)
+ if config == nil
+ raise InvalidConfigError.new ('Config cannot be nil')
+ end
+ if config.key?(:api_key) == false || config.key?(:username) == false
+ raise InvalidConfigError.new('A valid username and API key are required')
+ end
+
+ @config = config
+ @historics = DataSift::Historics.new(config)
+ @push = DataSift::Push.new(config)
+ @managed_source = DataSift::ManagedSource.new(config)
+ @historics_preview = DataSift::HistoricsPreview.new(config)
+ end
+
+ attr_reader :historics, :push, :managed_source, :historics_preview
+
+ ##
+ # Checks if the syntax of the given CSDL is valid
+ def valid?(csdl)
+ requires({:csdl => csdl})
+ res= DataSift.request(:POST, 'validate', @config, {:csdl => csdl})
+ res[:http][:status] == 200
+ end
+
+ ##
+ # Compile CSDL code.
+ #+csdl+:: The CSDL you wish to compile
+ def compile(csdl)
+ requires({:csdl => csdl})
+ DataSift.request(:POST, 'compile', @config, {:csdl => csdl})
+ end
+
+ ##
+ # Check the number of objects processed and delivered for a given time period.
+ #+period+:: Can be "day", "hour", or "current", defaults to hour
+ def usage(period = :hour)
+ DataSift.request(:POST, 'usage', @config, {:period => period})
+ end
+
+ ##
+ # Calculate the DPU cost of consuming a stream.
+ def dpu(hash)
+ requires ({:hash => hash})
+ DataSift.request(:POST, 'dpu', @config, {:hash => hash})
+ end
+
+ ##
+ # Determine your credit balance or DPU balance.
+ def balance
+ DataSift.request(:POST, 'balance', @config, {})
+ end
+
+ ##
+ # Collect a batch of interactions from a push queue
+ def pull(id, size = 20971520, cursor='')
+ DataSift.request(:POST, 'pull', @config, {:id => id, :size => size, :cursor => cursor})
+ end
+
+ end
+
+
+ # Generates and executes an HTTP request from the params provided
+ # Params:
+ # +method+:: the HTTP method to use e.g. GET,POST
+ # +path+:: the DataSift path relevant to the base URL of the API
+ # +username+:: API username
+ # +api_key+:: DS api key
+ # +params+:: A hash representing the params to use in the request, if it's a get,head or delete request these params
+ # are used as query string params, if not they become form url encoded params
+ # +headers+:: any headers to pass to the API, Authorization header is automatically included
+ def self.request(method, path, config, params = {}, headers = {}, timeout=30, open_timeout=30, new_line_separated=false)
+ validate config
+ options = {}
+ url = build_url(path, config)
+ case method.to_s.downcase.to_sym
+ when :get, :head, :delete
+ url += "#{URI.parse(url).query ? '&' : '?'}#{encode params}"
+ payload = nil
+ else
+ payload = encode params
+ end
+
+ headers.update ({
+ :user_agent => "DataSift/#{config[:api_version]} Ruby/v#{VERSION}",
+ :authorization => "#{config[:username]}:#{config[:api_key]}",
+ :content_type => 'application/x-www-form-urlencoded'
+ })
+
+ options.update(
+ :headers => headers,
+ :method => method,
+ :open_timeout => open_timeout,
+ :timeout => timeout,
+ :payload => payload,
+ :url => url
+ )
+
+ begin
+ response = RestClient::Request.execute options
+ if response != nil && response.length > 0
+ if new_line_separated
+ res_arr = response.split("\n")
+ data = []
+ res_arr.each { |e|
+ interaction = MultiJson.load(e, :symbolize_keys => true)
+ data.push(interaction)
+ if params.has_key? :on_interaction
+ params[:on_interaction].call(interaction)
+ end
+ }
+ else
+ data = MultiJson.load(response, :symbolize_keys => true)
+ end
+ else
+ data = {}
+ end
+ {
+ :data => data,
+ :datasift => {
+ :x_ratelimit_limit => response.headers[:x_ratelimit_limit],
+ :x_ratelimit_remaining => response.headers[:x_ratelimit_remaining],
+ :x_ratelimit_cost => response.headers[:x_ratelimit_cost]
+ },
+ :http => {
+ :status => response.code,
+ :headers => response.headers
+ }
+ }
+ rescue MultiJson::DecodeError => de
+ raise DataSiftError.new response
+ rescue SocketError => e
+ process_client_error(e)
+ rescue RestClient::ExceptionWithResponse => e
+ begin
+ code = e.http_code
+ body = e.http_body
+ if code && body
+ error = MultiJson.load(body)
+ handle_api_error(e.http_code, error['error'] + " for URL #{url}")
+ else
+ process_client_error(e)
+ end
+ rescue MultiJson::DecodeError
+ process_client_error(e)
+ end
+ rescue RestClient::Exception, Errno::ECONNREFUSED => e
+ process_client_error (e)
+ end
+ end
+
+ def self.build_url(path, config)
+ 'http' + (config[:enable_ssl] ? 's' : '') + '://' + config[:api_host] + '/' + config[:api_version] + '/' + path
+ end
+
+ #returns true if username and api key are set
+ def self.is_invalid? config
+ !config.key?(:username) || !config.key?(:api_key)
+ end
+
+ def self.validate conf
+ if is_invalid? conf
+ raise InvalidConfigError.new 'A username and api_key are required'
+ end
+ end
+
+ def self.encode params
+ URI.escape(params.collect { |k, v| "#{k}=#{v}" }.join('&'))
+ end
+
+ def self.handle_api_error(code, body)
+ case code
+ when 400
+ raise BadRequestError.new(code, body)
+ when 401
+ raise AuthError.new(code, body)
+ when 404
+ raise ApiResourceNotFoundError.new(code, body)
+ else
+ raise DataSiftError.new(code, body)
+ end
+ end
+
+ def self.process_client_error(e)
+ case e
+ when RestClient::ServerBrokeConnection, RestClient::RequestTimeout
+ message = 'Unable to connect to DataSift. Please check your connection and try again'
+ when RestClient::SSLCertificateNotVerified
+ message = 'Failed to complete SSL verification'
+ when SocketError
+ message = 'Communication with DataSift failed. Are you able to resolve api.datasift.com?'
+ else
+ message = 'Unexpected error.'
+ end
+ raise ConnectionError.new(message + " (Network error: #{e.message})")
+ end
+
+ ##
+ # a Proc/lambda callback to receive delete messages
+ # DataSift and its customers are required to process Twitter's delete request, a delete handler must be provided
+ # a Proc/lambda callback to receive errors
+ # Because EventMachine is used errors can be raised from another thread, this method will receive any such errors
+ def self.new_stream(config, on_delete, on_error, on_open = nil, on_close = nil)
+ if on_delete == nil || on_error == nil
+ raise NotConfiguredError.new 'on_delete and on_error are required before you can connect'
+ end
+
+ begin
+ stream = WebsocketTD::Websocket.new('websocket.datasift.com', '/multi', "username=#{config[:username]}&api_key=#{config[:api_key]}")
+ connection = LiveStream.new(config, stream)
+
+ stream.on_open = lambda {
+ connection.connected = true
+ connection.retry_timeout = 0
+ on_open.call(connection) if on_open != nil
+ }
+
+ stream.on_close = lambda {
+ connection.connected = false
+ retry_connect(config, connection, on_delete, on_error, on_open, on_close, '', true)
+ }
+ stream.on_error = lambda {
+ connection.connected = false
+ on_error.call(connection) if on_close != nil
+ retry_connect(config, connection, on_delete, on_error, on_open, on_close)
+ }
+ stream.on_message =lambda { |msg|
+ data = MultiJson.load(msg.data, :symbolize_keys => true)
+ if data.has_key?(:deleted)
+ on_delete.call(connection, data)
+ elsif data.has_key?(:status)
+ connection.fire_ds_message(data)
+ else
+ connection.fire_on_message(data[:hash], data[:data])
+ end
+ }
+ rescue Exception => e
+ retry_connect(config, connection, on_delete, on_error, on_open, on_close, e.message)
+ end
+ connection
+ end
+
+ def self.retry_connect(config, connection, on_delete, on_error, on_open, on_close, message = '', use_closed = false)
+ connection.retry_timeout = connection.retry_timeout == 0 ? 10 : connection.retry_timeout * 2
+ if connection.retry_timeout > config[:max_retry_time]
+ if use_closed && on_close != nil
+ on_close.call(connection)
+ else
+ on_error.call ReconnectTimeoutError.new "Connecting to DataSift has failed, re-connection was attempted but
+ multiple consecutive failures where encountered. As a result no further
+ re-connection will be automatically attempted. Manually invoke connect() after
+ investigating the cause of the failure, be sure to observe DataSift's
+ re-connect policies available at http://dev.datasift.com/docs/streaming-api/reconnecting
+ - Error { #{message}}"
+ end
+ else
+ sleep connection.retry_timeout
+ new_stream(config, on_delete, on_error, on_open, on_close)
+ end
+ end
+end
\ No newline at end of file