lib/dexter/indexer.rb in pgdexter-0.1.1 vs lib/dexter/indexer.rb in pgdexter-0.1.2

- old
+ new

@@ -1,38 +1,102 @@ module Dexter class Indexer - attr_reader :client + include Logging - def initialize(client) - @client = client + def initialize(database_url, options) + @database_url = database_url + @create = options[:create] + @log_level = options[:log_level] + @exclude_tables = options[:exclude] + @log_sql = options[:log_sql] - select_all("SET client_min_messages = warning") - select_all("CREATE EXTENSION IF NOT EXISTS hypopg") + create_extension end def process_queries(queries) - # narrow down queries and tables - tables, queries = narrow_queries(queries) - return [] if tables.empty? + # reset hypothetical indexes + reset_hypothetical_indexes - # get ready for hypothetical indexes + # filter queries from other databases and system tables + tables = possible_tables(queries) + queries.each do |query| + query.missing_tables = !query.tables.all? { |t| tables.include?(t) } + end + + # exclude user specified tables + # TODO exclude write-heavy tables + @exclude_tables.each do |table| + tables.delete(table) + end + + # analyze tables if needed + analyze_tables(tables) if tables.any? + + # get initial costs for queries + calculate_initial_cost(queries.reject(&:missing_tables)) + + # create hypothetical indexes + candidates = tables.any? ? create_hypothetical_indexes(tables) : {} + + # get new costs and see if new indexes were used + new_indexes = determine_indexes(queries, candidates) + + # display and create new indexes + show_and_create_indexes(new_indexes) + end + + private + + def create_extension + select_all("SET client_min_messages = warning") + select_all("CREATE EXTENSION IF NOT EXISTS hypopg") + end + + def reset_hypothetical_indexes select_all("SELECT hypopg_reset()") + end - # ensure tables have recently been analyzed - analyze_tables(tables) + def analyze_tables(tables) + tables = tables.to_a.sort - # get initial plans - initial_plans = {} + analyze_stats = select_all <<-SQL + SELECT + schemaname AS schema, + relname AS table, + last_analyze, + last_autoanalyze + FROM + pg_stat_user_tables + WHERE + relname IN (#{tables.map { |t| quote(t) }.join(", ")}) + SQL + + last_analyzed = {} + analyze_stats.each do |stats| + last_analyzed[stats["table"]] = Time.parse(stats["last_analyze"]) if stats["last_analyze"] + end + + tables.each do |table| + if !last_analyzed[table] || last_analyzed[table] < Time.now - 3600 + statement = "ANALYZE #{quote_ident(table)}" + log "Running analyze: #{statement}" + select_all(statement) + end + end + end + + def calculate_initial_cost(queries) queries.each do |query| begin - initial_plans[query] = plan(query) + query.initial_cost = plan(query.statement)["Total Cost"] rescue PG::Error # do nothing end end - queries.select! { |q| initial_plans[q] } + end + def create_hypothetical_indexes(tables) # get existing indexes index_set = Set.new indexes(tables).each do |index| # TODO make sure btree index_set << [index["table"], index["columns"]] @@ -43,72 +107,108 @@ columns(tables).each do |col| unless index_set.include?([col[:table], [col[:column]]]) candidates[col] = select_all("SELECT * FROM hypopg_create_index('CREATE INDEX ON #{col[:table]} (#{[col[:column]].join(", ")})');").first["indexname"] end end + candidates + end - queries_by_index = {} + def determine_indexes(queries, candidates) + new_indexes = {} - new_indexes = [] queries.each do |query| - starting_cost = initial_plans[query]["Total Cost"] - plan2 = plan(query) - cost2 = plan2["Total Cost"] - best_indexes = [] + if query.initial_cost + new_plan = plan(query.statement) + query.new_cost = new_plan["Total Cost"] + cost_savings = query.new_cost < query.initial_cost * 0.5 - candidates.each do |col, index_name| - if plan2.inspect.include?(index_name) && cost2 < starting_cost * 0.5 - best_indexes << { - table: col[:table], - columns: [col[:column]] - } - (queries_by_index[best_indexes.last] ||= []) << { - starting_cost: starting_cost, - final_cost: cost2, - query: query - } + query_indexes = [] + candidates.each do |col, index_name| + if new_plan.inspect.include?(index_name) + index = { + table: col[:table], + columns: [col[:column]] + } + query_indexes << index + + if cost_savings + new_indexes[index] ||= index.dup + (new_indexes[index][:queries] ||= []) << query + end + end end end - new_indexes.concat(best_indexes) + if @log_level == "debug2" + log "Processed #{query.fingerprint}" + if query.initial_cost + log "Cost: #{query.initial_cost} -> #{query.new_cost}" + + if query_indexes.any? + log "Indexes: #{query_indexes.map { |i| "#{i[:table]} (#{i[:columns].join(", ")})" }.join(", ")}" + log "Need 50% cost savings to suggest index" unless cost_savings + else + log "Indexes: None" + end + elsif query.fingerprint == "unknown" + log "Could not parse query" + elsif query.tables.empty? + log "No tables" + elsif query.missing_tables + log "Tables not present in current database" + else + log "Could not run explain" + end + + puts + puts query.statement + puts + end end - new_indexes = new_indexes.uniq.sort_by(&:to_a) + new_indexes.values.sort_by(&:to_a) + end - # create indexes + def show_and_create_indexes(new_indexes) if new_indexes.any? new_indexes.each do |index| - index[:queries] = queries_by_index[index] - log "Index found: #{index[:table]} (#{index[:columns].join(", ")})" - # log "CREATE INDEX CONCURRENTLY ON #{index[:table]} (#{index[:columns].join(", ")});" - # index[:queries].sort_by { |q| fingerprints[q[:query]] }.each do |query| - # log "Query #{fingerprints[query[:query]]} (Cost: #{query[:starting_cost]} -> #{query[:final_cost]})" - # puts - # puts query[:query] - # puts - # end + + if @log_level.start_with?("debug") + index[:queries].sort_by(&:fingerprint).each do |query| + log "Query #{query.fingerprint} (Cost: #{query.initial_cost} -> #{query.new_cost})" + puts + puts query.statement + puts + end + end end - new_indexes.each do |index| - statement = "CREATE INDEX CONCURRENTLY ON #{index[:table]} (#{index[:columns].join(", ")})" - # puts "#{statement};" - if client.options[:create] + if @create + # TODO use advisory locks + # 1. create lock + # 2. refresh existing index list + # 3. create indexes that still don't exist + # 4. release lock + new_indexes.each do |index| + statement = "CREATE INDEX CONCURRENTLY ON #{index[:table]} (#{index[:columns].join(", ")})" log "Creating index: #{statement}" started_at = Time.now select_all(statement) log "Index created: #{((Time.now - started_at) * 1000).to_i} ms" end end + else + log "No indexes found" end new_indexes end def conn @conn ||= begin - uri = URI.parse(client.arguments[0]) + uri = URI.parse(@database_url) config = { host: uri.host, port: uri.port, dbname: uri.path.sub(/\A\//, ""), user: uri.user, @@ -120,32 +220,42 @@ rescue PG::ConnectionBad abort "Bad database url" end def select_all(query) - conn.exec(query).to_a + # use exec_params instead of exec for securiy + # + # Unlike PQexec, PQexecParams allows at most one SQL command in the given string. + # (There can be semicolons in it, but not more than one nonempty command.) + # This is a limitation of the underlying protocol, but has some usefulness + # as an extra defense against SQL-injection attacks. + # https://www.postgresql.org/docs/current/static/libpq-exec.html + query = squish(query) + log "SQL: #{query}" if @log_sql + conn.exec_params(query, []).to_a end def plan(query) - JSON.parse(select_all("EXPLAIN (FORMAT JSON) #{query}").first["QUERY PLAN"]).first["Plan"] + # strip semi-colons as another measure of defense + JSON.parse(select_all("EXPLAIN (FORMAT JSON) #{query.gsub(";", "")}").first["QUERY PLAN"]).first["Plan"] end - def narrow_queries(queries) + def database_tables result = select_all <<-SQL SELECT table_name FROM information_schema.tables WHERE table_catalog = current_database() AND table_schema NOT IN ('pg_catalog', 'information_schema') SQL - possible_tables = Set.new(result.map { |r| r["table_name"] }) + result.map { |r| r["table_name"] } + end - tables = queries.flat_map { |q| PgQuery.parse(q).tables }.uniq.select { |t| possible_tables.include?(t) } - - [tables, queries.select { |q| PgQuery.parse(q).tables.all? { |t| possible_tables.include?(t) } }] + def possible_tables(queries) + Set.new(queries.flat_map(&:tables).uniq & database_tables) end def columns(tables) columns = select_all <<-SQL SELECT @@ -166,17 +276,11 @@ SELECT schemaname AS schema, t.relname AS table, ix.relname AS name, regexp_replace(pg_get_indexdef(i.indexrelid), '^[^\\(]*\\((.*)\\)$', '\\1') AS columns, - regexp_replace(pg_get_indexdef(i.indexrelid), '.* USING ([^ ]*) \\(.*', '\\1') AS using, - indisunique AS unique, - indisprimary AS primary, - indisvalid AS valid, - indexprs::text, - indpred::text, - pg_get_indexdef(i.indexrelid) AS definition + regexp_replace(pg_get_indexdef(i.indexrelid), '.* USING ([^ ]*) \\(.*', '\\1') AS using FROM pg_index i INNER JOIN pg_class t ON t.oid = i.indrelid INNER JOIN @@ -201,49 +305,28 @@ else part end end - def analyze_tables(tables) - analyze_stats = select_all <<-SQL - SELECT - schemaname AS schema, - relname AS table, - last_analyze, - last_autoanalyze - FROM - pg_stat_user_tables - WHERE - relname IN (#{tables.map { |t| quote(t) }.join(", ")}) - SQL - - last_analyzed = {} - analyze_stats.each do |stats| - last_analyzed[stats["table"]] = Time.parse(stats["last_analyze"]) if stats["last_analyze"] - end - - tables.each do |table| - if !last_analyzed[table] || last_analyzed[table] < Time.now - 3600 - log "Analyzing #{table}" - select_all("ANALYZE #{table}") - end - end + def quote_ident(value) + conn.quote_ident(value) end def quote(value) if value.is_a?(String) "'#{quote_string(value)}'" else value end end - # activerecord + # from activerecord def quote_string(s) s.gsub(/\\/, '\&\&').gsub(/'/, "''") end - def log(message) - puts "#{Time.now.iso8601} #{message}" + # from activesupport + def squish(str) + str.to_s.gsub(/\A[[:space:]]+/, "").gsub(/[[:space:]]+\z/, "").gsub(/[[:space:]]+/, " ") end end end