lib/dexter/indexer.rb in pgdexter-0.4.3 vs lib/dexter/indexer.rb in pgdexter-0.5.0

- old
+ new

@@ -12,71 +12,28 @@ @log_explain = options[:log_explain] @min_time = options[:min_time] || 0 @min_calls = options[:min_calls] || 0 @analyze = options[:analyze] @min_cost_savings_pct = options[:min_cost_savings_pct].to_i - @log_table = options[:log_table] @options = options @mutex = Mutex.new - create_extension unless extension_exists? + if server_version_num < 110000 + raise Dexter::Abort, "This version of Dexter requires Postgres 11+" + end + + check_extension + execute("SET lock_timeout = '5s'") end def process_stat_statements queries = stat_statements.map { |q| Query.new(q) }.sort_by(&:fingerprint).group_by(&:fingerprint).map { |_, v| v.first } log "Processing #{queries.size} new query fingerprints" process_queries(queries) end - def stat_activity - execute <<~SQL - SELECT - pid || ':' || COALESCE(query_start, xact_start) AS id, - query, - EXTRACT(EPOCH FROM NOW() - COALESCE(query_start, xact_start)) * 1000.0 AS duration_ms - FROM - pg_stat_activity - WHERE - datname = current_database() - AND state = 'active' - AND pid != pg_backend_pid() - ORDER BY - 1 - SQL - end - - # works with - # file_fdw: https://www.postgresql.org/docs/current/file-fdw.html - # log_fdw: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Appendix.PostgreSQL.CommonDBATasks.Extensions.foreign-data-wrappers.html - def csvlog_activity(last_log_time) - query = <<~SQL - SELECT - log_time, - message, - detail - FROM - #{conn.quote_ident(@log_table)} - WHERE - log_time >= \$1 - SQL - execute(query, params: [last_log_time]) - end - - # works with - # file_fdw: https://www.postgresql.org/docs/current/file-fdw.html - # log_fdw: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Appendix.PostgreSQL.CommonDBATasks.Extensions.foreign-data-wrappers.html - def stderr_activity - query = <<~SQL - SELECT - log_entry - FROM - #{conn.quote_ident(@log_table)} - SQL - execute(query) - end - def process_queries(queries) # reset hypothetical indexes reset_hypothetical_indexes tables = Set.new(database_tables + materialized_views) @@ -148,42 +105,44 @@ show_and_create_indexes(new_indexes, queries) end private - def create_extension - execute("SET client_min_messages = warning") - begin - execute("CREATE EXTENSION IF NOT EXISTS hypopg") - rescue PG::UndefinedFile + def check_extension + extension = execute("SELECT installed_version FROM pg_available_extensions WHERE name = 'hypopg'").first + + if extension.nil? raise Dexter::Abort, "Install HypoPG first: https://github.com/ankane/dexter#installation" - rescue PG::InsufficientPrivilege - raise Dexter::Abort, "Use a superuser to run: CREATE EXTENSION hypopg" end - end - def extension_exists? - execute("SELECT * FROM pg_available_extensions WHERE name = 'hypopg' AND installed_version IS NOT NULL").any? + if extension["installed_version"].nil? + if @options[:enable_hypopg] + execute("CREATE EXTENSION hypopg") + else + raise Dexter::Abort, "Run `CREATE EXTENSION hypopg` or pass --enable-hypopg" + end + end end def reset_hypothetical_indexes execute("SELECT hypopg_reset()") end def analyze_tables(tables) tables = tables.to_a.sort - analyze_stats = execute <<~SQL + query = <<~SQL SELECT schemaname || '.' || relname AS table, last_analyze, last_autoanalyze FROM pg_stat_user_tables WHERE - schemaname || '.' || relname IN (#{tables.map { |t| quote(t) }.join(", ")}) + schemaname || '.' || relname IN (#{tables.size.times.map { |i| "$#{i + 1}" }.join(", ")}) SQL + analyze_stats = execute(query, params: tables.to_a) last_analyzed = {} analyze_stats.each do |stats| last_analyzed[stats["table"]] = Time.parse(stats["last_analyze"]) if stats["last_analyze"] end @@ -210,14 +169,10 @@ puts "Explaining query" puts end begin query.plans << plan(query.statement) - if @log_explain - # Pass format to prevent ANALYZE - puts execute("EXPLAIN (FORMAT TEXT) #{safe_statement(query.statement)}", pretty: false).map { |r| r["QUERY PLAN"] }.join("\n") - end rescue PG::Error, JSON::NestingError => e if @log_explain log e.message end end @@ -243,11 +198,11 @@ explainable_queries.each do |query| log "Finding columns: #{query.statement}" if @log_level == "debug3" find_columns(query.tree).each do |col| last_col = col["fields"].last if last_col["String"] - possible_columns << last_col["String"]["str"] + possible_columns << last_col["String"]["sval"] end end end # create hypothetical indexes @@ -539,11 +494,11 @@ end def conn @conn ||= begin # set connect timeout if none set - ENV["PGCONNECT_TIMEOUT"] ||= "2" + ENV["PGCONNECT_TIMEOUT"] ||= "3" if @options[:dbname] =~ /\Apostgres(ql)?:\/\// config = @options[:dbname] else config = { @@ -567,20 +522,60 @@ # (There can be semicolons in it, but not more than one nonempty command.) # This is a limitation of the underlying protocol, but has some usefulness # as an extra defense against SQL-injection attacks. # https://www.postgresql.org/docs/current/static/libpq-exec.html query = squish(query) if pretty - log colorize("[sql] #{query}", :cyan) if @log_sql + log colorize("[sql] #{query}#{params.any? ? " /*#{params.to_json}*/" : ""}", :cyan) if @log_sql @mutex.synchronize do - conn.exec_params(query, params).to_a + conn.exec_params("#{query} /*dexter*/", params).to_a end end def plan(query) + prepared = false + transaction = false + + # try to EXPLAIN normalized queries + # https://dev.to/yugabyte/explain-from-pgstatstatements-normalized-queries-how-to-always-get-the-generic-plan-in--5cfi + explain_normalized = query.include?("$1") + if explain_normalized + prepared_name = "dexter_prepared" + execute("PREPARE #{prepared_name} AS #{safe_statement(query)}", pretty: false) + prepared = true + params = execute("SELECT array_length(parameter_types, 1) AS params FROM pg_prepared_statements WHERE name = $1", params: [prepared_name]).first["params"].to_i + query = "EXECUTE #{prepared_name}(#{params.times.map { "NULL" }.join(", ")})" + + execute("BEGIN") + transaction = true + + if server_version_num >= 120000 + execute("SET LOCAL plan_cache_mode = force_generic_plan") + else + execute("SET LOCAL cpu_operator_cost = 1e42") + 5.times do + execute("EXPLAIN (FORMAT JSON) #{safe_statement(query)}", pretty: false) + end + execute("ROLLBACK") + execute("BEGIN") + end + end + # strip semi-colons as another measure of defense - JSON.parse(execute("EXPLAIN (FORMAT JSON) #{safe_statement(query)}", pretty: false).first["QUERY PLAN"], max_nesting: 1000).first["Plan"] + plan = JSON.parse(execute("EXPLAIN (FORMAT JSON) #{safe_statement(query)}", pretty: false).first["QUERY PLAN"], max_nesting: 1000).first["Plan"] + + if @log_explain + # Pass format to prevent ANALYZE + puts execute("EXPLAIN (FORMAT TEXT) #{safe_statement(query)}", pretty: false).map { |r| r["QUERY PLAN"] }.join("\n") + end + + plan + ensure + if explain_normalized + execute("ROLLBACK") if transaction + execute("DEALLOCATE #{prepared_name}") if prepared + end end # TODO for multicolumn indexes, use ordering def create_hypothetical_indexes_helper(columns_by_table, n, candidates) columns_by_table.each do |table, cols| @@ -606,21 +601,17 @@ SQL result.map { |r| r["table_name"] } end def materialized_views - if server_version_num >= 90300 - result = execute <<~SQL - SELECT - schemaname || '.' || matviewname AS table_name - FROM - pg_matviews - SQL - result.map { |r| r["table_name"] } - else - [] - end + result = execute <<~SQL + SELECT + schemaname || '.' || matviewname AS table_name + FROM + pg_matviews + SQL + result.map { |r| r["table_name"] } end def server_version_num execute("SHOW server_version_num").first["server_version_num"].to_i end @@ -650,71 +641,74 @@ view_tables end def stat_statements total_time = server_version_num >= 130000 ? "(total_plan_time + total_exec_time)" : "total_time" - result = execute <<~SQL + sql = <<~SQL SELECT DISTINCT query FROM pg_stat_statements INNER JOIN pg_database ON pg_database.oid = pg_stat_statements.dbid WHERE datname = current_database() - AND #{total_time} >= #{@min_time * 60000} - AND calls >= #{@min_calls} + AND #{total_time} >= \$1 + AND calls >= \$2 ORDER BY 1 SQL - result.map { |q| q["query"] } + execute(sql, params: [@min_time * 60000, @min_calls]).map { |q| q["query"] } end def with_advisory_lock lock_id = 123456 first_time = true - while execute("SELECT pg_try_advisory_lock(#{lock_id})").first["pg_try_advisory_lock"] != "t" + while execute("SELECT pg_try_advisory_lock($1)", params: [lock_id]).first["pg_try_advisory_lock"] != "t" if first_time log "Waiting for lock..." first_time = false end sleep(1) end yield ensure - with_min_messages("error") do - execute("SELECT pg_advisory_unlock(#{lock_id})") + suppress_messages do + execute("SELECT pg_advisory_unlock($1)", params: [lock_id]) end end - def with_min_messages(value) - execute("SET client_min_messages = #{quote(value)}") + def suppress_messages + conn.set_notice_processor do |message| + # do nothing + end yield ensure - execute("SET client_min_messages = warning") + # clear notice processor + conn.set_notice_processor end def index_exists?(index) indexes([index[:table]]).find { |i| i["columns"] == index[:columns] } end def columns(tables) - columns = execute <<~SQL + query = <<~SQL SELECT s.nspname || '.' || t.relname AS table_name, a.attname AS column_name, pg_catalog.format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_attribute a JOIN pg_class t on a.attrelid = t.oid JOIN pg_namespace s on t.relnamespace = s.oid WHERE a.attnum > 0 AND NOT a.attisdropped - AND s.nspname || '.' || t.relname IN (#{tables.map { |t| quote(t) }.join(", ")}) + AND s.nspname || '.' || t.relname IN (#{tables.size.times.map { |i| "$#{i + 1}" }.join(", ")}) ORDER BY 1, 2 SQL - + columns = execute(query, params: tables.to_a) columns.map { |v| {table: v["table_name"], column: v["column_name"], type: v["data_type"]} } end def indexes(tables) query = <<~SQL @@ -730,18 +724,18 @@ INNER JOIN pg_class ix ON ix.oid = i.indexrelid LEFT JOIN pg_stat_user_indexes ui ON ui.indexrelid = i.indexrelid WHERE - schemaname || '.' || t.relname IN (#{tables.map { |t| quote(t) }.join(", ")}) AND + schemaname || '.' || t.relname IN (#{tables.size.times.map { |i| "$#{i + 1}" }.join(", ")}) AND indisvalid = 't' AND indexprs IS NULL AND indpred IS NULL ORDER BY 1, 2 SQL - execute(query).map { |v| v["columns"] = v["columns"].sub(") WHERE (", " WHERE ").split(", ").map { |c| unquote(c) }; v } + execute(query, params: tables.to_a).map { |v| v["columns"] = v["columns"].sub(") WHERE (", " WHERE ").split(", ").map { |c| unquote(c) }; v } end def search_path execute("SELECT current_schemas(true)")[0]["current_schemas"][1..-2].split(",") end @@ -754,22 +748,9 @@ end end def quote_ident(value) value.split(".").map { |v| conn.quote_ident(v) }.join(".") - end - - def quote(value) - if value.is_a?(String) - "'#{quote_string(value)}'" - else - value - end - end - - # from activerecord - def quote_string(s) - s.gsub(/\\/, '\&\&').gsub(/'/, "''") end # from activesupport def squish(str) str.to_s.gsub(/\A[[:space:]]+/, "").gsub(/[[:space:]]+\z/, "").gsub(/[[:space:]]+/, " ")