lib/dexter/indexer.rb in pgdexter-0.4.3 vs lib/dexter/indexer.rb in pgdexter-0.5.0
- old
+ new
@@ -12,71 +12,28 @@
@log_explain = options[:log_explain]
@min_time = options[:min_time] || 0
@min_calls = options[:min_calls] || 0
@analyze = options[:analyze]
@min_cost_savings_pct = options[:min_cost_savings_pct].to_i
- @log_table = options[:log_table]
@options = options
@mutex = Mutex.new
- create_extension unless extension_exists?
+ if server_version_num < 110000
+ raise Dexter::Abort, "This version of Dexter requires Postgres 11+"
+ end
+
+ check_extension
+
execute("SET lock_timeout = '5s'")
end
def process_stat_statements
queries = stat_statements.map { |q| Query.new(q) }.sort_by(&:fingerprint).group_by(&:fingerprint).map { |_, v| v.first }
log "Processing #{queries.size} new query fingerprints"
process_queries(queries)
end
- def stat_activity
- execute <<~SQL
- SELECT
- pid || ':' || COALESCE(query_start, xact_start) AS id,
- query,
- EXTRACT(EPOCH FROM NOW() - COALESCE(query_start, xact_start)) * 1000.0 AS duration_ms
- FROM
- pg_stat_activity
- WHERE
- datname = current_database()
- AND state = 'active'
- AND pid != pg_backend_pid()
- ORDER BY
- 1
- SQL
- end
-
- # works with
- # file_fdw: https://www.postgresql.org/docs/current/file-fdw.html
- # log_fdw: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Appendix.PostgreSQL.CommonDBATasks.Extensions.foreign-data-wrappers.html
- def csvlog_activity(last_log_time)
- query = <<~SQL
- SELECT
- log_time,
- message,
- detail
- FROM
- #{conn.quote_ident(@log_table)}
- WHERE
- log_time >= \$1
- SQL
- execute(query, params: [last_log_time])
- end
-
- # works with
- # file_fdw: https://www.postgresql.org/docs/current/file-fdw.html
- # log_fdw: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Appendix.PostgreSQL.CommonDBATasks.Extensions.foreign-data-wrappers.html
- def stderr_activity
- query = <<~SQL
- SELECT
- log_entry
- FROM
- #{conn.quote_ident(@log_table)}
- SQL
- execute(query)
- end
-
def process_queries(queries)
# reset hypothetical indexes
reset_hypothetical_indexes
tables = Set.new(database_tables + materialized_views)
@@ -148,42 +105,44 @@
show_and_create_indexes(new_indexes, queries)
end
private
- def create_extension
- execute("SET client_min_messages = warning")
- begin
- execute("CREATE EXTENSION IF NOT EXISTS hypopg")
- rescue PG::UndefinedFile
+ def check_extension
+ extension = execute("SELECT installed_version FROM pg_available_extensions WHERE name = 'hypopg'").first
+
+ if extension.nil?
raise Dexter::Abort, "Install HypoPG first: https://github.com/ankane/dexter#installation"
- rescue PG::InsufficientPrivilege
- raise Dexter::Abort, "Use a superuser to run: CREATE EXTENSION hypopg"
end
- end
- def extension_exists?
- execute("SELECT * FROM pg_available_extensions WHERE name = 'hypopg' AND installed_version IS NOT NULL").any?
+ if extension["installed_version"].nil?
+ if @options[:enable_hypopg]
+ execute("CREATE EXTENSION hypopg")
+ else
+ raise Dexter::Abort, "Run `CREATE EXTENSION hypopg` or pass --enable-hypopg"
+ end
+ end
end
def reset_hypothetical_indexes
execute("SELECT hypopg_reset()")
end
def analyze_tables(tables)
tables = tables.to_a.sort
- analyze_stats = execute <<~SQL
+ query = <<~SQL
SELECT
schemaname || '.' || relname AS table,
last_analyze,
last_autoanalyze
FROM
pg_stat_user_tables
WHERE
- schemaname || '.' || relname IN (#{tables.map { |t| quote(t) }.join(", ")})
+ schemaname || '.' || relname IN (#{tables.size.times.map { |i| "$#{i + 1}" }.join(", ")})
SQL
+ analyze_stats = execute(query, params: tables.to_a)
last_analyzed = {}
analyze_stats.each do |stats|
last_analyzed[stats["table"]] = Time.parse(stats["last_analyze"]) if stats["last_analyze"]
end
@@ -210,14 +169,10 @@
puts "Explaining query"
puts
end
begin
query.plans << plan(query.statement)
- if @log_explain
- # Pass format to prevent ANALYZE
- puts execute("EXPLAIN (FORMAT TEXT) #{safe_statement(query.statement)}", pretty: false).map { |r| r["QUERY PLAN"] }.join("\n")
- end
rescue PG::Error, JSON::NestingError => e
if @log_explain
log e.message
end
end
@@ -243,11 +198,11 @@
explainable_queries.each do |query|
log "Finding columns: #{query.statement}" if @log_level == "debug3"
find_columns(query.tree).each do |col|
last_col = col["fields"].last
if last_col["String"]
- possible_columns << last_col["String"]["str"]
+ possible_columns << last_col["String"]["sval"]
end
end
end
# create hypothetical indexes
@@ -539,11 +494,11 @@
end
def conn
@conn ||= begin
# set connect timeout if none set
- ENV["PGCONNECT_TIMEOUT"] ||= "2"
+ ENV["PGCONNECT_TIMEOUT"] ||= "3"
if @options[:dbname] =~ /\Apostgres(ql)?:\/\//
config = @options[:dbname]
else
config = {
@@ -567,20 +522,60 @@
# (There can be semicolons in it, but not more than one nonempty command.)
# This is a limitation of the underlying protocol, but has some usefulness
# as an extra defense against SQL-injection attacks.
# https://www.postgresql.org/docs/current/static/libpq-exec.html
query = squish(query) if pretty
- log colorize("[sql] #{query}", :cyan) if @log_sql
+ log colorize("[sql] #{query}#{params.any? ? " /*#{params.to_json}*/" : ""}", :cyan) if @log_sql
@mutex.synchronize do
- conn.exec_params(query, params).to_a
+ conn.exec_params("#{query} /*dexter*/", params).to_a
end
end
def plan(query)
+ prepared = false
+ transaction = false
+
+ # try to EXPLAIN normalized queries
+ # https://dev.to/yugabyte/explain-from-pgstatstatements-normalized-queries-how-to-always-get-the-generic-plan-in--5cfi
+ explain_normalized = query.include?("$1")
+ if explain_normalized
+ prepared_name = "dexter_prepared"
+ execute("PREPARE #{prepared_name} AS #{safe_statement(query)}", pretty: false)
+ prepared = true
+ params = execute("SELECT array_length(parameter_types, 1) AS params FROM pg_prepared_statements WHERE name = $1", params: [prepared_name]).first["params"].to_i
+ query = "EXECUTE #{prepared_name}(#{params.times.map { "NULL" }.join(", ")})"
+
+ execute("BEGIN")
+ transaction = true
+
+ if server_version_num >= 120000
+ execute("SET LOCAL plan_cache_mode = force_generic_plan")
+ else
+ execute("SET LOCAL cpu_operator_cost = 1e42")
+ 5.times do
+ execute("EXPLAIN (FORMAT JSON) #{safe_statement(query)}", pretty: false)
+ end
+ execute("ROLLBACK")
+ execute("BEGIN")
+ end
+ end
+
# strip semi-colons as another measure of defense
- JSON.parse(execute("EXPLAIN (FORMAT JSON) #{safe_statement(query)}", pretty: false).first["QUERY PLAN"], max_nesting: 1000).first["Plan"]
+ plan = JSON.parse(execute("EXPLAIN (FORMAT JSON) #{safe_statement(query)}", pretty: false).first["QUERY PLAN"], max_nesting: 1000).first["Plan"]
+
+ if @log_explain
+ # Pass format to prevent ANALYZE
+ puts execute("EXPLAIN (FORMAT TEXT) #{safe_statement(query)}", pretty: false).map { |r| r["QUERY PLAN"] }.join("\n")
+ end
+
+ plan
+ ensure
+ if explain_normalized
+ execute("ROLLBACK") if transaction
+ execute("DEALLOCATE #{prepared_name}") if prepared
+ end
end
# TODO for multicolumn indexes, use ordering
def create_hypothetical_indexes_helper(columns_by_table, n, candidates)
columns_by_table.each do |table, cols|
@@ -606,21 +601,17 @@
SQL
result.map { |r| r["table_name"] }
end
def materialized_views
- if server_version_num >= 90300
- result = execute <<~SQL
- SELECT
- schemaname || '.' || matviewname AS table_name
- FROM
- pg_matviews
- SQL
- result.map { |r| r["table_name"] }
- else
- []
- end
+ result = execute <<~SQL
+ SELECT
+ schemaname || '.' || matviewname AS table_name
+ FROM
+ pg_matviews
+ SQL
+ result.map { |r| r["table_name"] }
end
def server_version_num
execute("SHOW server_version_num").first["server_version_num"].to_i
end
@@ -650,71 +641,74 @@
view_tables
end
def stat_statements
total_time = server_version_num >= 130000 ? "(total_plan_time + total_exec_time)" : "total_time"
- result = execute <<~SQL
+ sql = <<~SQL
SELECT
DISTINCT query
FROM
pg_stat_statements
INNER JOIN
pg_database ON pg_database.oid = pg_stat_statements.dbid
WHERE
datname = current_database()
- AND #{total_time} >= #{@min_time * 60000}
- AND calls >= #{@min_calls}
+ AND #{total_time} >= \$1
+ AND calls >= \$2
ORDER BY
1
SQL
- result.map { |q| q["query"] }
+ execute(sql, params: [@min_time * 60000, @min_calls]).map { |q| q["query"] }
end
def with_advisory_lock
lock_id = 123456
first_time = true
- while execute("SELECT pg_try_advisory_lock(#{lock_id})").first["pg_try_advisory_lock"] != "t"
+ while execute("SELECT pg_try_advisory_lock($1)", params: [lock_id]).first["pg_try_advisory_lock"] != "t"
if first_time
log "Waiting for lock..."
first_time = false
end
sleep(1)
end
yield
ensure
- with_min_messages("error") do
- execute("SELECT pg_advisory_unlock(#{lock_id})")
+ suppress_messages do
+ execute("SELECT pg_advisory_unlock($1)", params: [lock_id])
end
end
- def with_min_messages(value)
- execute("SET client_min_messages = #{quote(value)}")
+ def suppress_messages
+ conn.set_notice_processor do |message|
+ # do nothing
+ end
yield
ensure
- execute("SET client_min_messages = warning")
+ # clear notice processor
+ conn.set_notice_processor
end
def index_exists?(index)
indexes([index[:table]]).find { |i| i["columns"] == index[:columns] }
end
def columns(tables)
- columns = execute <<~SQL
+ query = <<~SQL
SELECT
s.nspname || '.' || t.relname AS table_name,
a.attname AS column_name,
pg_catalog.format_type(a.atttypid, a.atttypmod) AS data_type
FROM pg_attribute a
JOIN pg_class t on a.attrelid = t.oid
JOIN pg_namespace s on t.relnamespace = s.oid
WHERE a.attnum > 0
AND NOT a.attisdropped
- AND s.nspname || '.' || t.relname IN (#{tables.map { |t| quote(t) }.join(", ")})
+ AND s.nspname || '.' || t.relname IN (#{tables.size.times.map { |i| "$#{i + 1}" }.join(", ")})
ORDER BY
1, 2
SQL
-
+ columns = execute(query, params: tables.to_a)
columns.map { |v| {table: v["table_name"], column: v["column_name"], type: v["data_type"]} }
end
def indexes(tables)
query = <<~SQL
@@ -730,18 +724,18 @@
INNER JOIN
pg_class ix ON ix.oid = i.indexrelid
LEFT JOIN
pg_stat_user_indexes ui ON ui.indexrelid = i.indexrelid
WHERE
- schemaname || '.' || t.relname IN (#{tables.map { |t| quote(t) }.join(", ")}) AND
+ schemaname || '.' || t.relname IN (#{tables.size.times.map { |i| "$#{i + 1}" }.join(", ")}) AND
indisvalid = 't' AND
indexprs IS NULL AND
indpred IS NULL
ORDER BY
1, 2
SQL
- execute(query).map { |v| v["columns"] = v["columns"].sub(") WHERE (", " WHERE ").split(", ").map { |c| unquote(c) }; v }
+ execute(query, params: tables.to_a).map { |v| v["columns"] = v["columns"].sub(") WHERE (", " WHERE ").split(", ").map { |c| unquote(c) }; v }
end
def search_path
execute("SELECT current_schemas(true)")[0]["current_schemas"][1..-2].split(",")
end
@@ -754,22 +748,9 @@
end
end
def quote_ident(value)
value.split(".").map { |v| conn.quote_ident(v) }.join(".")
- end
-
- def quote(value)
- if value.is_a?(String)
- "'#{quote_string(value)}'"
- else
- value
- end
- end
-
- # from activerecord
- def quote_string(s)
- s.gsub(/\\/, '\&\&').gsub(/'/, "''")
end
# from activesupport
def squish(str)
str.to_s.gsub(/\A[[:space:]]+/, "").gsub(/[[:space:]]+\z/, "").gsub(/[[:space:]]+/, " ")