# frozen_string_literal: true require "pry" # This class represents a response from the Databricks SQL API. # It is used by DatabricksSQL to handle http failures and parse the response body. class DatabricksSQLResponse def initialize(http_response) self.raw_response = http_response self.body = parse_body self.data_array = extract_data_array end attr_accessor :raw_response, :body, :data_array # -------------------- BODY -------------------- # Parse the response body as JSON. def parse_body return {} unless raw_response.is_a?(Net::HTTPSuccess) @body = JSON.parse(raw_response.body) end # Dig out the statement_id from the response body. # @return [String | nil] def statement_id body["statement_id"] end # -------------------- CHUNKS -------------------- # Determine if the response contains multiple chunks. def more_chunks? chunk_count = body&.dig("manifest", "total_chunk_count")&.to_i chunk_count && chunk_count > 1 end # Dig out the next_chunk_internal_link from the response body. # @return [String | nil] def next_chunk body.dig("result", "next_chunk_internal_link") end # Combine the data from the chunk response into the data from the original response. # @return [Array] def add_chunk_to_data(chunk_response) chunk_data_array = chunk_response.data_array self.data_array = [*data_array, *chunk_data_array] end # -------------------- STATUS -------------------- # Determine if the response from the API has succeeded. def success? status == "SUCCEEDED" end # Determine if the response from the API is still executing. # PENDING means the warehouse is starting up # RUNNING means the query is still executing def pending? %w[PENDING RUNNING].include?(status) end # Determine if the response from the API has failed. def failed? status == "FAILED" end # Dig out the error message from the response body. # @return [String | nil] def error_message body.dig("status", "error", "message") end # Dig out the status of the query from the response body. # @return [String] def status return "FAILED" unless raw_response.is_a?(Net::HTTPSuccess) body.dig("status", "state") end # ------------------- RESULTS -------------------- # Dig out the columns array from the response body. # @return [Array] def columns body.dig("manifest", "schema", "columns") || [] end # Dig out values array for the queried data. # Chunks have a simpler hash structure than initial SQL responses. # @return [Array] def extract_data_array body.dig("result", "data_array") || body["data_array"] || [] end # Return the results of the query as an array of hashes with string keys. # @return [Array] def results(symbolize_keys: false) return [] if failed? data_array.map do |row| hash = {} columns.each do |column| key = symbolize_keys ? column["name"].to_sym : column["name"] hash[key] = row[column["position"]] end hash end end end