# frozen_string_literal: true require "json" # This module handles the execution of SQL statements via the DBX API. # For more information about the DBX SQL API, see: https://docs.databricks.com/sql/admin/sql-execution-tutorial.html # Azure specific tutorial: https://learn.microsoft.com/en-us/azure/databricks/sql/api/sql-execution-tutorial module DatabricksSQL def sql_url "#{@base_url}/api/2.0/sql/statements/" end def sql_uri URI(sql_url) end # POST request object # POST /api/2.0/sql/statements def sql_request(sql) params = { "statement" => sql, "wait_timeout" => "0s", "warehouse_id" => @warehouse } request = Net::HTTP::Post.new(sql_uri.request_uri, request_headers) request.body = params.to_json request end # POST SQL query to DBX def post_sql_request(sql) response = http.request(sql_request(sql)) response.body end # GET request object # GET /api/2.0/sql/statements/{statement_id} def sql_results_request(statement_id) req_uri = "#{sql_uri.request_uri}#{statement_id}" Net::HTTP::Get.new(req_uri, request_headers) end # GET results of SQL query from DBX. def get_sql_results(http_response) statement_id = JSON.parse(http_response)["statement_id"] response = http.request(sql_results_request(statement_id)) puts "#{statement_id}: #{JSON.parse(response.body)["status"]["state"]}" response.body end # GET SQL chunk from DBX by internal link def get_sql_chunk(chunk_url) request = Net::HTTP::Get.new(chunk_url, request_headers) response = http.request(request) response.body end # Load additional chunks of data from DBX. # DBX returns data with maximum chunk size of 16mb. def load_additional_chunks(results_hash) next_chunk = results_hash["result"]["next_chunk_internal_link"] while next_chunk response = get_sql_chunk(next_chunk) parsed_response = JSON.parse(response) result = parsed_response["data_array"] data = results_hash["result"]["data_array"] results_hash["result"]["data_array"] = [*data, *result] next_chunk = parsed_response["next_chunk_internal_link"] end end # Wait for SQL query response from DBX. # Returns a hash of the results of the SQL query. def wait_for_sql_response(response) result = get_sql_results(response) status = JSON.parse(result)["status"]["state"] # PENDING means the warehouse is starting up # RUNNING means the query is still executing while %w[PENDING RUNNING].include?(status) sleep(5) result = get_sql_results(response) status = JSON.parse(result)["status"]["state"] end JSON.parse(result) end # Parse JSON response from DBX into array of hashes. # Provides output c/w Big Query. def parse_result(http_response) keys = JSON.parse(http_response)["manifest"]["schema"]["columns"] data_array = JSON.parse(http_response)["result"]["data_array"] data_array.map do |row| hash = {} keys.each do |key| hash[key["name"]] = row[key["position"]] end hash end end # Submit SQL query to DBX and return results. # returns a JSON string of the results of the SQL query def run_sql(sql) response = post_sql_request(sql) results_hash = wait_for_sql_response(response) load_additional_chunks(results_hash) if results_hash["manifest"]["total_chunk_count"] > 1 JSON.dump(results_hash) end end