# frozen_string_literal: true require "json" require_relative "sql_response" # This module handles the execution of SQL statements via the DBX API. # For more information about the DBX SQL API, see: https://docs.databricks.com/sql/admin/sql-execution-tutorial.html # Azure specific tutorial: https://learn.microsoft.com/en-us/azure/databricks/sql/api/sql-execution-tutorial module DatabricksSQL def sql_url "#{@base_url}/api/2.0/sql/statements/" end def sql_uri URI(sql_url) end # POST request object # POST /api/2.0/sql/statements def sql_request(sql) params = { "statement" => sql, "wait_timeout" => "0s", "warehouse_id" => @warehouse } request = Net::HTTP::Post.new(sql_uri.request_uri, request_headers) request.body = params.to_json request end # POST SQL query to DBX def post_sql_request(sql) response = http.request(sql_request(sql)) DatabricksSQLResponse.new(response) end # GET request object # GET /api/2.0/sql/statements/{statement_id} def sql_results_request(statement_id) req_uri = "#{sql_uri.request_uri}#{statement_id}" Net::HTTP::Get.new(req_uri, request_headers) end # GET SQL chunk from DBX by internal link # @return [Hash<{"chunk_index" => Number, "row_offset" => Number, "row_count" => Number, "data_array" => Array}>] # rubocop:disable Layout/LineLength def get_sql_chunk(chunk_url) puts "GET chunk: #{chunk_url}" request = Net::HTTP::Get.new(chunk_url, request_headers) response = http.request(request) DatabricksSQLResponse.new(response) end # Load additional chunks of data from DBX. # DBX returns data with maximum chunk size of 16mb. def load_additional_chunks(response) next_chunk = response.next_chunk while next_chunk chunk_response = get_sql_chunk(next_chunk) response.add_chunk_to_data(chunk_response) next_chunk = chunk_response.next_chunk end end # GET results of SQL query from DBX. def get_sql_results(dbx_sql_response) statement_id = dbx_sql_response.statement_id http_response = http.request(sql_results_request(statement_id)) response = DatabricksSQLResponse.new(http_response) puts "#{statement_id}: #{response.status}" response end # Wait for SQL query response from DBX. # Returns a hash of the results of the SQL query. def wait_for_sql_response(response) result = get_sql_results(response) still_running = result.pending? while still_running sleep(@sleep_timer) result = get_sql_results(response) still_running = result.pending? end result end # Submit SQL query to DBX and return results. # @return [DatabricksSQLResponse] def run_sql(sql) posted_sql = post_sql_request(sql) sql_results = wait_for_sql_response(posted_sql) load_additional_chunks(sql_results) if sql_results.more_chunks? sql_results end end