module Clickhouse class Connection module Client include ActiveSupport::NumberHelper def connect! ping! unless connected? end def ping! ensure_authentication status = client.get("/").status if status != 200 raise ConnectionError, "Unexpected response status: #{status}" end true rescue Faraday::Error => e raise ConnectionError, e.message end def connected? instance_variables.include?(:@client) && !!@client end def get(query) request(:get, query) end def post(query, body = nil) request(:post, query, body) end def url "#{@config[:scheme]}://#{@config[:host]}:#{@config[:port]}" end private def client @client ||= Faraday.new(:url => url) end def ensure_authentication username, password = @config.values_at(:username, :password) client.basic_auth(username || "default", password) if username || password end def path(query) params = @config.select{|k, _v| k == :database} params[:query] = query params[:output_format_write_statistics] = 1 query_string = params.collect{|k, v| "#{k}=#{CGI.escape(v.to_s)}"}.join("&") "/?#{query_string}" end def request(method, query, body = nil) connect! query = query.strip start = Time.now response = client.send(method, path(query), body) status = response.status duration = Time.now - start query, format = Utils.extract_format(query) response = parse_body(format, response.body) stats = parse_stats(response) write_log duration, query, stats raise QueryError, "Got status #{status} (expected 200): #{response}" unless status == 200 response rescue Faraday::Error => e raise ConnectionError, e.message end def parse_body(format, body) case format when "JSON", "JSONCompact" body = JSON.parse(body) if body.strip[0] == "{" end body end def parse_stats(response) return {} unless response.is_a?(Hash) options = {:locale => :en, :precision => 2, :significant => false} stats = response["statistics"].merge("rows" => response["rows"]) factor = 1 / stats["elapsed"] stats["elapsed"] = number_to_human_duration(stats["elapsed"]) stats["rows_per_second"] = number_to_human(stats["rows_read"] * factor, options).downcase stats["data_per_second"] = number_to_human_size(stats["bytes_read"] * factor, options) stats["rows_read"] = number_to_human(stats["rows_read"], options).downcase stats["data_read"] = number_to_human_size(stats["bytes_read"], options) stats end def write_log(duration, query, stats) duration = number_to_human_duration(duration) rows, elapsed, rows_read, data_read, rows_per_second, data_per_second = stats.values_at(*%w( rows elapsed rows_read data_read rows_per_second data_per_second )) line1 = "\n \e[1mSQL (#{duration})\e[0m #{query};" line2 = "\n \e[1m#{rows} #{"row".pluralize(rows)} in set. Elapsed: #{elapsed}. Processed: #{rows_read} rows, #{data_read} (#{rows_per_second} rows/s, #{data_per_second}/s)\e[0m" if rows log :debug, "#{line1}#{line2} " end def number_to_human_duration(number) if (amount = number * 1000.0) < 1000 round = (amount < 1) ? 3 : 1 "#{amount.round(round)}ms" else "#{number.round(1)}s" end end end end end