lib/daru/td.rb in daru-td-0.1.1 vs lib/daru/td.rb in daru-td-0.2.0

- old
+ new

@@ -56,17 +56,56 @@ header: header, show_progress: show_progress, clear_progress: clear_progress) end + # Read Treasure Data query into a Daru's DataFrame. + # + # Returns a Daru::DataFrame corresponding to the result set of the query string. + # + # @param query [String] + # Query string to be executed. + # @param engine [Daru::TD::QueryEngine] + # Handler returned by create_engine. + # @param parse_dates [Array, nil] + # When an Array given, it has column names to parse as dates. + # @param distributed_join [true, false] + # (Presto only) If true, distributed join is enabled. + # If false (default), broadcast join is used. + # See https://prestodb.io/docs/current/release/release-0.77.html + # @params kwargs [Hash, nil] + # Parameters to pass to execute method. + # Available parameters: + # - result_url [String] is result output URL. + # - priority [Integer, String] is job's priority (e.g. "NORMAL", "HIGH", etc.) + # - retry_limit [Integer] is retry limit. + # @return [Daru::DataFrame] def self.read_td_query(query, engine, **kwargs) distributed_join = kwargs.delete(:distributed_join) parse_dates = kwargs.delete(:parse_dates) header = engine.create_header('read_td_query') if engine.type == :presto && distributed_join header += "-- set session distributed_join = #{!!distributed_join}\n" end result = engine.execute(header + query, **kwargs) + result.to_dataframe(parse_dates: parse_dates) + end + + # Read Treasure Data job result int a Daru's DataFrame. + # + # Returns a DataFrame corresponding to the result set of the job. + # This method waits for job completion if the specified job is still running. + # + # @param job_id [Integer] Job ID. + # @param engine [Daru::TD::QueryEngine] + # Handler returned by create_engine. + # @param parse_dates [Array, nil] + # When an Array given, it has column names to parse as dates. + # @return [Daru::DataFrame] + def self.read_td_job(job_id, engine, **kwargs) + parse_dates = kwargs.delete(:parse_dates) + job = QueryEngine::JobWrapper.new(engine.connection.client.job(job_id)) + result = engine.get_result(job, wait: true) result.to_dataframe(parse_dates: parse_dates) end def self.parse_query(query_string) CGI.parse(query_string).tap do |hash|