lib/dexter/indexer.rb in pgdexter-0.2.1 vs lib/dexter/indexer.rb in pgdexter-0.3.0

- old
+ new

@@ -8,10 +8,12 @@ @exclude_tables = options[:exclude] @include_tables = Array(options[:include].split(",")) if options[:include] @log_sql = options[:log_sql] @log_explain = options[:log_explain] @min_time = options[:min_time] || 0 + @min_calls = options[:min_calls] || 0 + @analyze = options[:analyze] @options = options create_extension unless extension_exists? execute("SET lock_timeout = '5s'") end @@ -24,28 +26,43 @@ def process_queries(queries) # reset hypothetical indexes reset_hypothetical_indexes - # filter queries from other databases and system tables - tables = possible_tables(queries) - queries.each do |query| - query.missing_tables = !query.tables.all? { |t| tables.include?(t) } - end + tables = Set.new(database_tables) if @include_tables - tables = Set.new(tables.to_a & @include_tables) + include_set = Set.new(@include_tables) + tables.keep_if { |t| include_set.include?(t) || include_set.include?(t.split(".")[-1]) } end - # exclude user specified tables - # TODO exclude write-heavy tables - @exclude_tables.each do |table| - tables.delete(table) + if @exclude_tables.any? + exclude_set = Set.new(@exclude_tables) + tables.delete_if { |t| exclude_set.include?(t) || exclude_set.include?(t.split(".")[-1]) } end + # map tables without schema to schema + no_schema_tables = {} + search_path_index = Hash[search_path.map.with_index.to_a] + tables.group_by { |t| t.split(".")[-1] }.each do |group, t2| + no_schema_tables[group] = t2.sort_by { |t| search_path_index[t.split(".")[0]] || 1000000 }[0] + end + + # filter queries from other databases and system tables + queries.each do |query| + # add schema to table if needed + query.tables = query.tables.map { |t| no_schema_tables[t] || t } + + # check for missing tables + query.missing_tables = !query.tables.all? { |t| tables.include?(t) } + end + + # set tables + tables = Set.new(queries.reject(&:missing_tables).flat_map(&:tables)) + # analyze tables if needed - analyze_tables(tables) if tables.any? + analyze_tables(tables) if tables.any? && (@analyze || @log_level == "debug2") # create hypothetical indexes and explain queries candidates = tables.any? ? create_hypothetical_indexes(queries.reject(&:missing_tables), tables) : {} # see if new indexes were used and meet bar @@ -79,27 +96,33 @@ def analyze_tables(tables) tables = tables.to_a.sort analyze_stats = execute <<-SQL SELECT - schemaname AS schema, - relname AS table, + schemaname || '.' || relname AS table, last_analyze, last_autoanalyze FROM pg_stat_user_tables WHERE - relname IN (#{tables.map { |t| quote(t) }.join(", ")}) + schemaname || '.' || relname IN (#{tables.map { |t| quote(t) }.join(", ")}) SQL last_analyzed = {} analyze_stats.each do |stats| last_analyzed[stats["table"]] = Time.parse(stats["last_analyze"]) if stats["last_analyze"] end tables.each do |table| - if !last_analyzed[table] || last_analyzed[table] < Time.now - 3600 + la = last_analyzed[table] + + if @log_level == "debug2" + time_str = la ? la.iso8601 : "Unknown" + log "Last analyze: #{table} : #{time_str}" + end + + if @analyze && (!la || la < Time.now - 3600) statement = "ANALYZE #{quote_ident(table)}" log "Running analyze: #{statement}" execute(statement) end end @@ -135,10 +158,11 @@ if tables.any? # since every set of multi-column indexes are expensive # try to parse out columns possible_columns = Set.new explainable_queries.each do |query| + log "Finding columns: #{query.statement}" if @log_level == "debug3" find_columns(query.tree).each do |col| last_col = col["fields"].last if last_col["String"] possible_columns << last_col["String"]["str"] end @@ -294,76 +318,79 @@ "None" end end def show_and_create_indexes(new_indexes, queries, tables) + # print summary if new_indexes.any? new_indexes.each do |index| log "Index found: #{index[:table]} (#{index[:columns].join(", ")})" end + else + log "No new indexes found" + end - if @log_level.start_with?("debug") - index_queries = new_indexes.flat_map { |i| i[:queries].sort_by(&:fingerprint) } - if @log_level == "debug2" - fingerprints = Set.new(index_queries.map(&:fingerprint)) - index_queries.concat(queries.reject { |q| fingerprints.include?(q.fingerprint) }.sort_by(&:fingerprint)) - end - index_queries.each do |query| - log "-" * 80 - log "Query #{query.fingerprint}" - log "Total time: #{(query.total_time / 60000.0).round(1)} min, avg time: #{(query.total_time / query.calls.to_f).round} ms, calls: #{query.calls}" if query.total_time - if tables.empty? - log "No candidate tables for indexes" - elsif query.explainable? && !query.high_cost? - log "Low initial cost: #{query.initial_cost}" - elsif query.explainable? - query_indexes = query.indexes || [] - log "Start: #{query.costs[0]}" - log "Pass1: #{query.costs[1]} : #{log_indexes(query.pass1_indexes || [])}" - log "Pass2: #{query.costs[2]} : #{log_indexes(query.pass2_indexes || [])}" - log "Final: #{query.new_cost} : #{log_indexes(query_indexes)}" - if query_indexes.any? && !query.suggest_index - log "Need 50% cost savings to suggest index" - end - elsif query.fingerprint == "unknown" - log "Could not parse query" - elsif query.tables.empty? - log "No tables" - elsif query.missing_tables - log "Tables not present in current database" - else - log "Could not run explain" + # debug info + if @log_level.start_with?("debug") + index_queries = new_indexes.flat_map { |i| i[:queries].sort_by(&:fingerprint) } + if @log_level == "debug2" + fingerprints = Set.new(index_queries.map(&:fingerprint)) + index_queries.concat(queries.reject { |q| fingerprints.include?(q.fingerprint) }.sort_by(&:fingerprint)) + end + index_queries.each do |query| + log "-" * 80 + log "Query #{query.fingerprint}" + log "Total time: #{(query.total_time / 60000.0).round(1)} min, avg time: #{(query.total_time / query.calls.to_f).round} ms, calls: #{query.calls}" if query.total_time + if tables.empty? + log "No candidate tables for indexes" + elsif query.explainable? && !query.high_cost? + log "Low initial cost: #{query.initial_cost}" + elsif query.explainable? + query_indexes = query.indexes || [] + log "Start: #{query.costs[0]}" + log "Pass1: #{query.costs[1]} : #{log_indexes(query.pass1_indexes || [])}" + log "Pass2: #{query.costs[2]} : #{log_indexes(query.pass2_indexes || [])}" + log "Final: #{query.new_cost} : #{log_indexes(query.suggest_index ? query_indexes : [])}" + if query_indexes.any? && !query.suggest_index + log "Need 50% cost savings to suggest index" end - log - log query.statement - log + elsif query.fingerprint == "unknown" + log "Could not parse query" + elsif query.tables.empty? + log "No tables" + elsif query.missing_tables + log "Tables not present in current database" + else + log "Could not run explain" end + log + log query.statement + log end + end - if @create - # 1. create lock - # 2. refresh existing index list - # 3. create indexes that still don't exist - # 4. release lock - with_advisory_lock do - new_indexes.each do |index| - unless index_exists?(index) - statement = "CREATE INDEX CONCURRENTLY ON #{quote_ident(index[:table])} (#{index[:columns].map { |c| quote_ident(c) }.join(", ")})" - log "Creating index: #{statement}" - started_at = Time.now - begin - execute(statement) - log "Index created: #{((Time.now - started_at) * 1000).to_i} ms" - rescue PG::LockNotAvailable => e - log "Could not acquire lock: #{index[:table]}" - end + # create + if @create && new_indexes.any? + # 1. create lock + # 2. refresh existing index list + # 3. create indexes that still don't exist + # 4. release lock + with_advisory_lock do + new_indexes.each do |index| + unless index_exists?(index) + statement = "CREATE INDEX CONCURRENTLY ON #{quote_ident(index[:table])} (#{index[:columns].map { |c| quote_ident(c) }.join(", ")})" + log "Creating index: #{statement}" + started_at = Time.now + begin + execute(statement) + log "Index created: #{((Time.now - started_at) * 1000).to_i} ms" + rescue PG::LockNotAvailable + log "Could not acquire lock: #{index[:table]}" end end end end - else - log "No new indexes found" end new_indexes end @@ -415,11 +442,11 @@ end def database_tables result = execute <<-SQL SELECT - table_name + table_schema || '.' || table_name AS table_name FROM information_schema.tables WHERE table_catalog = current_database() AND table_schema NOT IN ('pg_catalog', 'information_schema') @@ -437,20 +464,17 @@ INNER JOIN pg_database ON pg_database.oid = pg_stat_statements.dbid WHERE datname = current_database() AND total_time >= #{@min_time * 60000} + AND calls >= #{@min_calls} ORDER BY 1 SQL result.map { |q| q["query"] } end - def possible_tables(queries) - Set.new(queries.flat_map(&:tables).uniq & database_tables) - end - def with_advisory_lock lock_id = 123456 first_time = true while execute("SELECT pg_try_advisory_lock(#{lock_id})").first["pg_try_advisory_lock"] != "t" if first_time @@ -478,30 +502,28 @@ end def columns(tables) columns = execute <<-SQL SELECT - table_name, + table_schema || '.' || table_name AS table_name, column_name, data_type FROM information_schema.columns WHERE - table_schema = 'public' AND - table_name IN (#{tables.map { |t| quote(t) }.join(", ")}) + table_schema || '.' || table_name IN (#{tables.map { |t| quote(t) }.join(", ")}) ORDER BY 1, 2 SQL columns.map { |v| {table: v["table_name"], column: v["column_name"], type: v["data_type"]} } end def indexes(tables) execute(<<-SQL SELECT - schemaname AS schema, - t.relname AS table, + schemaname || '.' || t.relname AS table, ix.relname AS name, regexp_replace(pg_get_indexdef(i.indexrelid), '^[^\\(]*\\((.*)\\)$', '\\1') AS columns, regexp_replace(pg_get_indexdef(i.indexrelid), '.* USING ([^ ]*) \\(.*', '\\1') AS using FROM pg_index i @@ -510,30 +532,33 @@ INNER JOIN pg_class ix ON ix.oid = i.indexrelid LEFT JOIN pg_stat_user_indexes ui ON ui.indexrelid = i.indexrelid WHERE - t.relname IN (#{tables.map { |t| quote(t) }.join(", ")}) AND - schemaname IS NOT NULL AND + schemaname || '.' || t.relname IN (#{tables.map { |t| quote(t) }.join(", ")}) AND indisvalid = 't' AND indexprs IS NULL AND indpred IS NULL ORDER BY 1, 2 SQL ).map { |v| v["columns"] = v["columns"].sub(") WHERE (", " WHERE ").split(", ").map { |c| unquote(c) }; v } end + def search_path + execute("SHOW search_path")[0]["search_path"].split(",").map(&:strip) + end + def unquote(part) - if part && part.start_with?('"') + if part && part.start_with?('"') && part.end_with?('"') part[1..-2] else part end end def quote_ident(value) - conn.quote_ident(value) + value.split(".").map { |v| conn.quote_ident(v) }.join(".") end def quote(value) if value.is_a?(String) "'#{quote_string(value)}'"