lib/daru/td.rb in daru-td-0.1.1 vs lib/daru/td.rb in daru-td-0.2.0
- old
+ new
@@ -56,17 +56,56 @@
header: header,
show_progress: show_progress,
clear_progress: clear_progress)
end
+ # Read Treasure Data query into a Daru's DataFrame.
+ #
+ # Returns a Daru::DataFrame corresponding to the result set of the query string.
+ #
+ # @param query [String]
+ # Query string to be executed.
+ # @param engine [Daru::TD::QueryEngine]
+ # Handler returned by create_engine.
+ # @param parse_dates [Array, nil]
+ # When an Array given, it has column names to parse as dates.
+ # @param distributed_join [true, false]
+ # (Presto only) If true, distributed join is enabled.
+ # If false (default), broadcast join is used.
+ # See https://prestodb.io/docs/current/release/release-0.77.html
+ # @params kwargs [Hash, nil]
+ # Parameters to pass to execute method.
+ # Available parameters:
+ # - result_url [String] is result output URL.
+ # - priority [Integer, String] is job's priority (e.g. "NORMAL", "HIGH", etc.)
+ # - retry_limit [Integer] is retry limit.
+ # @return [Daru::DataFrame]
def self.read_td_query(query, engine, **kwargs)
distributed_join = kwargs.delete(:distributed_join)
parse_dates = kwargs.delete(:parse_dates)
header = engine.create_header('read_td_query')
if engine.type == :presto && distributed_join
header += "-- set session distributed_join = #{!!distributed_join}\n"
end
result = engine.execute(header + query, **kwargs)
+ result.to_dataframe(parse_dates: parse_dates)
+ end
+
+ # Read Treasure Data job result int a Daru's DataFrame.
+ #
+ # Returns a DataFrame corresponding to the result set of the job.
+ # This method waits for job completion if the specified job is still running.
+ #
+ # @param job_id [Integer] Job ID.
+ # @param engine [Daru::TD::QueryEngine]
+ # Handler returned by create_engine.
+ # @param parse_dates [Array, nil]
+ # When an Array given, it has column names to parse as dates.
+ # @return [Daru::DataFrame]
+ def self.read_td_job(job_id, engine, **kwargs)
+ parse_dates = kwargs.delete(:parse_dates)
+ job = QueryEngine::JobWrapper.new(engine.connection.client.job(job_id))
+ result = engine.get_result(job, wait: true)
result.to_dataframe(parse_dates: parse_dates)
end
def self.parse_query(query_string)
CGI.parse(query_string).tap do |hash|