lib/dbx/databricks/sql.rb in dbx-api-0.1.1 vs lib/dbx/databricks/sql.rb in dbx-api-0.2.0

- old
+ new

@@ -1,8 +1,9 @@ # frozen_string_literal: true require "json" +require_relative "sql_response" # This module handles the execution of SQL statements via the DBX API. # For more information about the DBX SQL API, see: https://docs.databricks.com/sql/admin/sql-execution-tutorial.html # Azure specific tutorial: https://learn.microsoft.com/en-us/azure/databricks/sql/api/sql-execution-tutorial module DatabricksSQL @@ -28,83 +29,68 @@ end # POST SQL query to DBX def post_sql_request(sql) response = http.request(sql_request(sql)) - response.body + DatabricksSQLResponse.new(response) end # GET request object # GET /api/2.0/sql/statements/{statement_id} def sql_results_request(statement_id) req_uri = "#{sql_uri.request_uri}#{statement_id}" Net::HTTP::Get.new(req_uri, request_headers) end - # GET results of SQL query from DBX. - def get_sql_results(http_response) - statement_id = JSON.parse(http_response)["statement_id"] - response = http.request(sql_results_request(statement_id)) - puts "#{statement_id}: #{JSON.parse(response.body)["status"]["state"]}" - response.body - end - # GET SQL chunk from DBX by internal link + # @return [Hash<{"chunk_index" => Number, "row_offset" => Number, "row_count" => Number, "data_array" => Array<Array>}>] # rubocop:disable Layout/LineLength def get_sql_chunk(chunk_url) + puts "GET chunk: #{chunk_url}" request = Net::HTTP::Get.new(chunk_url, request_headers) response = http.request(request) - response.body + DatabricksSQLResponse.new(response) end # Load additional chunks of data from DBX. # DBX returns data with maximum chunk size of 16mb. - def load_additional_chunks(results_hash) - next_chunk = results_hash["result"]["next_chunk_internal_link"] + def load_additional_chunks(response) + next_chunk = response.next_chunk while next_chunk - response = get_sql_chunk(next_chunk) - parsed_response = JSON.parse(response) - result = parsed_response["data_array"] - data = results_hash["result"]["data_array"] - results_hash["result"]["data_array"] = [*data, *result] - next_chunk = parsed_response["next_chunk_internal_link"] + chunk_response = get_sql_chunk(next_chunk) + response.add_chunk_to_data(chunk_response) + next_chunk = chunk_response.next_chunk end end + # GET results of SQL query from DBX. + def get_sql_results(dbx_sql_response) + statement_id = dbx_sql_response.statement_id + http_response = http.request(sql_results_request(statement_id)) + response = DatabricksSQLResponse.new(http_response) + puts "#{statement_id}: #{response.status}" + response + end + # Wait for SQL query response from DBX. # Returns a hash of the results of the SQL query. def wait_for_sql_response(response) result = get_sql_results(response) - status = JSON.parse(result)["status"]["state"] - # PENDING means the warehouse is starting up - # RUNNING means the query is still executing - while %w[PENDING RUNNING].include?(status) - sleep(5) + still_running = result.pending? + + while still_running + sleep(@sleep_timer) result = get_sql_results(response) - status = JSON.parse(result)["status"]["state"] + still_running = result.pending? end - JSON.parse(result) + result end - # Parse JSON response from DBX into array of hashes. - # Provides output c/w Big Query. - def parse_result(http_response) - keys = JSON.parse(http_response)["manifest"]["schema"]["columns"] - data_array = JSON.parse(http_response)["result"]["data_array"] - - data_array.map do |row| - hash = {} - keys.each do |key| - hash[key["name"]] = row[key["position"]] - end - hash - end - end - # Submit SQL query to DBX and return results. - # returns a JSON string of the results of the SQL query + # @return [DatabricksSQLResponse] def run_sql(sql) - response = post_sql_request(sql) - results_hash = wait_for_sql_response(response) - load_additional_chunks(results_hash) if results_hash["manifest"]["total_chunk_count"] > 1 - JSON.dump(results_hash) + posted_sql = post_sql_request(sql) + sql_results = wait_for_sql_response(posted_sql) + + load_additional_chunks(sql_results) if sql_results.more_chunks? + sql_results end end