require "clickhouse/connection/query/table" require "clickhouse/connection/query/result_set" require "clickhouse/connection/query/result_row" module Clickhouse class Connection module Query def execute(query, body = nil) body = post(query, body).body.to_s body.empty? ? true : body end def query(query) query = query.to_s.gsub(/(;|\bFORMAT \w+)/i, "").strip query += " FORMAT JSONCompact" parse_response get(query).body.to_s end def databases query("SHOW DATABASES").flatten end def tables query("SHOW TABLES").flatten end def create_table(name, &block) execute(Clickhouse::Connection::Query::Table.new(name, &block).to_sql) end def describe_table(name) query("DESCRIBE TABLE #{name}").to_a end def rename_table(*args) names = (args[0].is_a?(Hash) ? args[0].to_a : [args]).flatten raise Clickhouse::InvalidQueryError, "Odd number of table names" unless (names.size % 2) == 0 names = Hash[*names].collect{|(from, to)| "#{from} TO #{to}"} execute "RENAME TABLE #{names.join(", ")}" end def drop_table(name) execute "DROP TABLE #{name}" end def insert_rows(table, options = {}) options[:csv] ||= begin options[:rows] ||= yield([]) generate_csv options[:rows], options[:names] end execute "INSERT INTO #{table} FORMAT CSVWithNames", options[:csv] end def select_rows(options) query to_select_query(options) end def select_row(options) select_rows(options)[0] end def select_values(options) select_rows(options).collect{|row| row[0]} end def select_value(options) values = select_values(options) values[0] if values end def count(options) options = options.merge(:select => "COUNT(*)") select_value(options).to_i end def to_select_query(options) to_select_options(options).collect do |(key, value)| next if value.nil? || (value.respond_to?(:empty?) && value.empty?) statement = [key.to_s.upcase] statement << "BY" if %W(GROUP ORDER).include?(statement[0]) statement << to_segment(key, value) statement.join(" ") end.compact.join("\n").force_encoding("UTF-8") end private def generate_csv(rows, names = nil) hashes = rows[0].is_a?(Hash) if hashes names ||= rows[0].keys end CSV.generate do |csv| csv << names if names rows.each do |row| csv << (hashes ? row.values_at(*names) : row) end end end def inspect_value(value) value.nil? ? "NULL" : value.inspect.gsub(/(^"|"$)/, "'").gsub("\\\"", "\"") end def to_select_options(options) keys = [:select, :from, :where, :group, :having, :order, :limit, :offset] options = Hash[keys.zip(options.values_at(*keys))] options[:select] ||= "*" options[:limit] ||= 0 if options[:offset] options[:limit] = options.values_at(:offset, :limit).compact.join(", ") if options[:limit] options.delete(:offset) options end def to_segment(type, value) case type when :select [value].flatten.join(", ") when :where, :having value.is_a?(Hash) ? to_condition_statements(value) : value else value end end def to_condition_statements(value) value.collect do |attr, val| if val == :empty "empty(#{attr})" elsif val.is_a?(Range) [ "#{attr} >= #{inspect_value(val.first)}", "#{attr} <= #{inspect_value(val.last)}" ] elsif val.is_a?(Array) "#{attr} IN (#{val.collect{|x| inspect_value(x)}.join(", ")})" elsif val.to_s.match(/^`.*`$/) "#{attr} #{val.gsub(/(^`|`$)/, "")}" else "#{attr} = #{inspect_value(val)}" end end.flatten.join(" AND ") end def parse_response(response) data = JSON.parse response names = data["meta"].collect{|column| column["name"]} types = data["meta"].collect{|column| column["type"]} ResultSet.new data["data"], names, types end end end end