lib/dbx/databricks/sql.rb in dbx-api-0.1.1 vs lib/dbx/databricks/sql.rb in dbx-api-0.2.0
- old
+ new
@@ -1,8 +1,9 @@
# frozen_string_literal: true
require "json"
+require_relative "sql_response"
# This module handles the execution of SQL statements via the DBX API.
# For more information about the DBX SQL API, see: https://docs.databricks.com/sql/admin/sql-execution-tutorial.html
# Azure specific tutorial: https://learn.microsoft.com/en-us/azure/databricks/sql/api/sql-execution-tutorial
module DatabricksSQL
@@ -28,83 +29,68 @@
end
# POST SQL query to DBX
def post_sql_request(sql)
response = http.request(sql_request(sql))
- response.body
+ DatabricksSQLResponse.new(response)
end
# GET request object
# GET /api/2.0/sql/statements/{statement_id}
def sql_results_request(statement_id)
req_uri = "#{sql_uri.request_uri}#{statement_id}"
Net::HTTP::Get.new(req_uri, request_headers)
end
- # GET results of SQL query from DBX.
- def get_sql_results(http_response)
- statement_id = JSON.parse(http_response)["statement_id"]
- response = http.request(sql_results_request(statement_id))
- puts "#{statement_id}: #{JSON.parse(response.body)["status"]["state"]}"
- response.body
- end
-
# GET SQL chunk from DBX by internal link
+ # @return [Hash<{"chunk_index" => Number, "row_offset" => Number, "row_count" => Number, "data_array" => Array<Array>}>] # rubocop:disable Layout/LineLength
def get_sql_chunk(chunk_url)
+ puts "GET chunk: #{chunk_url}"
request = Net::HTTP::Get.new(chunk_url, request_headers)
response = http.request(request)
- response.body
+ DatabricksSQLResponse.new(response)
end
# Load additional chunks of data from DBX.
# DBX returns data with maximum chunk size of 16mb.
- def load_additional_chunks(results_hash)
- next_chunk = results_hash["result"]["next_chunk_internal_link"]
+ def load_additional_chunks(response)
+ next_chunk = response.next_chunk
while next_chunk
- response = get_sql_chunk(next_chunk)
- parsed_response = JSON.parse(response)
- result = parsed_response["data_array"]
- data = results_hash["result"]["data_array"]
- results_hash["result"]["data_array"] = [*data, *result]
- next_chunk = parsed_response["next_chunk_internal_link"]
+ chunk_response = get_sql_chunk(next_chunk)
+ response.add_chunk_to_data(chunk_response)
+ next_chunk = chunk_response.next_chunk
end
end
+ # GET results of SQL query from DBX.
+ def get_sql_results(dbx_sql_response)
+ statement_id = dbx_sql_response.statement_id
+ http_response = http.request(sql_results_request(statement_id))
+ response = DatabricksSQLResponse.new(http_response)
+ puts "#{statement_id}: #{response.status}"
+ response
+ end
+
# Wait for SQL query response from DBX.
# Returns a hash of the results of the SQL query.
def wait_for_sql_response(response)
result = get_sql_results(response)
- status = JSON.parse(result)["status"]["state"]
- # PENDING means the warehouse is starting up
- # RUNNING means the query is still executing
- while %w[PENDING RUNNING].include?(status)
- sleep(5)
+ still_running = result.pending?
+
+ while still_running
+ sleep(@sleep_timer)
result = get_sql_results(response)
- status = JSON.parse(result)["status"]["state"]
+ still_running = result.pending?
end
- JSON.parse(result)
+ result
end
- # Parse JSON response from DBX into array of hashes.
- # Provides output c/w Big Query.
- def parse_result(http_response)
- keys = JSON.parse(http_response)["manifest"]["schema"]["columns"]
- data_array = JSON.parse(http_response)["result"]["data_array"]
-
- data_array.map do |row|
- hash = {}
- keys.each do |key|
- hash[key["name"]] = row[key["position"]]
- end
- hash
- end
- end
-
# Submit SQL query to DBX and return results.
- # returns a JSON string of the results of the SQL query
+ # @return [DatabricksSQLResponse]
def run_sql(sql)
- response = post_sql_request(sql)
- results_hash = wait_for_sql_response(response)
- load_additional_chunks(results_hash) if results_hash["manifest"]["total_chunk_count"] > 1
- JSON.dump(results_hash)
+ posted_sql = post_sql_request(sql)
+ sql_results = wait_for_sql_response(posted_sql)
+
+ load_additional_chunks(sql_results) if sql_results.more_chunks?
+ sql_results
end
end