lib/dexter/indexer.rb in pgdexter-0.2.1 vs lib/dexter/indexer.rb in pgdexter-0.3.0
- old
+ new
@@ -8,10 +8,12 @@
@exclude_tables = options[:exclude]
@include_tables = Array(options[:include].split(",")) if options[:include]
@log_sql = options[:log_sql]
@log_explain = options[:log_explain]
@min_time = options[:min_time] || 0
+ @min_calls = options[:min_calls] || 0
+ @analyze = options[:analyze]
@options = options
create_extension unless extension_exists?
execute("SET lock_timeout = '5s'")
end
@@ -24,28 +26,43 @@
def process_queries(queries)
# reset hypothetical indexes
reset_hypothetical_indexes
- # filter queries from other databases and system tables
- tables = possible_tables(queries)
- queries.each do |query|
- query.missing_tables = !query.tables.all? { |t| tables.include?(t) }
- end
+ tables = Set.new(database_tables)
if @include_tables
- tables = Set.new(tables.to_a & @include_tables)
+ include_set = Set.new(@include_tables)
+ tables.keep_if { |t| include_set.include?(t) || include_set.include?(t.split(".")[-1]) }
end
- # exclude user specified tables
- # TODO exclude write-heavy tables
- @exclude_tables.each do |table|
- tables.delete(table)
+ if @exclude_tables.any?
+ exclude_set = Set.new(@exclude_tables)
+ tables.delete_if { |t| exclude_set.include?(t) || exclude_set.include?(t.split(".")[-1]) }
end
+ # map tables without schema to schema
+ no_schema_tables = {}
+ search_path_index = Hash[search_path.map.with_index.to_a]
+ tables.group_by { |t| t.split(".")[-1] }.each do |group, t2|
+ no_schema_tables[group] = t2.sort_by { |t| search_path_index[t.split(".")[0]] || 1000000 }[0]
+ end
+
+ # filter queries from other databases and system tables
+ queries.each do |query|
+ # add schema to table if needed
+ query.tables = query.tables.map { |t| no_schema_tables[t] || t }
+
+ # check for missing tables
+ query.missing_tables = !query.tables.all? { |t| tables.include?(t) }
+ end
+
+ # set tables
+ tables = Set.new(queries.reject(&:missing_tables).flat_map(&:tables))
+
# analyze tables if needed
- analyze_tables(tables) if tables.any?
+ analyze_tables(tables) if tables.any? && (@analyze || @log_level == "debug2")
# create hypothetical indexes and explain queries
candidates = tables.any? ? create_hypothetical_indexes(queries.reject(&:missing_tables), tables) : {}
# see if new indexes were used and meet bar
@@ -79,27 +96,33 @@
def analyze_tables(tables)
tables = tables.to_a.sort
analyze_stats = execute <<-SQL
SELECT
- schemaname AS schema,
- relname AS table,
+ schemaname || '.' || relname AS table,
last_analyze,
last_autoanalyze
FROM
pg_stat_user_tables
WHERE
- relname IN (#{tables.map { |t| quote(t) }.join(", ")})
+ schemaname || '.' || relname IN (#{tables.map { |t| quote(t) }.join(", ")})
SQL
last_analyzed = {}
analyze_stats.each do |stats|
last_analyzed[stats["table"]] = Time.parse(stats["last_analyze"]) if stats["last_analyze"]
end
tables.each do |table|
- if !last_analyzed[table] || last_analyzed[table] < Time.now - 3600
+ la = last_analyzed[table]
+
+ if @log_level == "debug2"
+ time_str = la ? la.iso8601 : "Unknown"
+ log "Last analyze: #{table} : #{time_str}"
+ end
+
+ if @analyze && (!la || la < Time.now - 3600)
statement = "ANALYZE #{quote_ident(table)}"
log "Running analyze: #{statement}"
execute(statement)
end
end
@@ -135,10 +158,11 @@
if tables.any?
# since every set of multi-column indexes are expensive
# try to parse out columns
possible_columns = Set.new
explainable_queries.each do |query|
+ log "Finding columns: #{query.statement}" if @log_level == "debug3"
find_columns(query.tree).each do |col|
last_col = col["fields"].last
if last_col["String"]
possible_columns << last_col["String"]["str"]
end
@@ -294,76 +318,79 @@
"None"
end
end
def show_and_create_indexes(new_indexes, queries, tables)
+ # print summary
if new_indexes.any?
new_indexes.each do |index|
log "Index found: #{index[:table]} (#{index[:columns].join(", ")})"
end
+ else
+ log "No new indexes found"
+ end
- if @log_level.start_with?("debug")
- index_queries = new_indexes.flat_map { |i| i[:queries].sort_by(&:fingerprint) }
- if @log_level == "debug2"
- fingerprints = Set.new(index_queries.map(&:fingerprint))
- index_queries.concat(queries.reject { |q| fingerprints.include?(q.fingerprint) }.sort_by(&:fingerprint))
- end
- index_queries.each do |query|
- log "-" * 80
- log "Query #{query.fingerprint}"
- log "Total time: #{(query.total_time / 60000.0).round(1)} min, avg time: #{(query.total_time / query.calls.to_f).round} ms, calls: #{query.calls}" if query.total_time
- if tables.empty?
- log "No candidate tables for indexes"
- elsif query.explainable? && !query.high_cost?
- log "Low initial cost: #{query.initial_cost}"
- elsif query.explainable?
- query_indexes = query.indexes || []
- log "Start: #{query.costs[0]}"
- log "Pass1: #{query.costs[1]} : #{log_indexes(query.pass1_indexes || [])}"
- log "Pass2: #{query.costs[2]} : #{log_indexes(query.pass2_indexes || [])}"
- log "Final: #{query.new_cost} : #{log_indexes(query_indexes)}"
- if query_indexes.any? && !query.suggest_index
- log "Need 50% cost savings to suggest index"
- end
- elsif query.fingerprint == "unknown"
- log "Could not parse query"
- elsif query.tables.empty?
- log "No tables"
- elsif query.missing_tables
- log "Tables not present in current database"
- else
- log "Could not run explain"
+ # debug info
+ if @log_level.start_with?("debug")
+ index_queries = new_indexes.flat_map { |i| i[:queries].sort_by(&:fingerprint) }
+ if @log_level == "debug2"
+ fingerprints = Set.new(index_queries.map(&:fingerprint))
+ index_queries.concat(queries.reject { |q| fingerprints.include?(q.fingerprint) }.sort_by(&:fingerprint))
+ end
+ index_queries.each do |query|
+ log "-" * 80
+ log "Query #{query.fingerprint}"
+ log "Total time: #{(query.total_time / 60000.0).round(1)} min, avg time: #{(query.total_time / query.calls.to_f).round} ms, calls: #{query.calls}" if query.total_time
+ if tables.empty?
+ log "No candidate tables for indexes"
+ elsif query.explainable? && !query.high_cost?
+ log "Low initial cost: #{query.initial_cost}"
+ elsif query.explainable?
+ query_indexes = query.indexes || []
+ log "Start: #{query.costs[0]}"
+ log "Pass1: #{query.costs[1]} : #{log_indexes(query.pass1_indexes || [])}"
+ log "Pass2: #{query.costs[2]} : #{log_indexes(query.pass2_indexes || [])}"
+ log "Final: #{query.new_cost} : #{log_indexes(query.suggest_index ? query_indexes : [])}"
+ if query_indexes.any? && !query.suggest_index
+ log "Need 50% cost savings to suggest index"
end
- log
- log query.statement
- log
+ elsif query.fingerprint == "unknown"
+ log "Could not parse query"
+ elsif query.tables.empty?
+ log "No tables"
+ elsif query.missing_tables
+ log "Tables not present in current database"
+ else
+ log "Could not run explain"
end
+ log
+ log query.statement
+ log
end
+ end
- if @create
- # 1. create lock
- # 2. refresh existing index list
- # 3. create indexes that still don't exist
- # 4. release lock
- with_advisory_lock do
- new_indexes.each do |index|
- unless index_exists?(index)
- statement = "CREATE INDEX CONCURRENTLY ON #{quote_ident(index[:table])} (#{index[:columns].map { |c| quote_ident(c) }.join(", ")})"
- log "Creating index: #{statement}"
- started_at = Time.now
- begin
- execute(statement)
- log "Index created: #{((Time.now - started_at) * 1000).to_i} ms"
- rescue PG::LockNotAvailable => e
- log "Could not acquire lock: #{index[:table]}"
- end
+ # create
+ if @create && new_indexes.any?
+ # 1. create lock
+ # 2. refresh existing index list
+ # 3. create indexes that still don't exist
+ # 4. release lock
+ with_advisory_lock do
+ new_indexes.each do |index|
+ unless index_exists?(index)
+ statement = "CREATE INDEX CONCURRENTLY ON #{quote_ident(index[:table])} (#{index[:columns].map { |c| quote_ident(c) }.join(", ")})"
+ log "Creating index: #{statement}"
+ started_at = Time.now
+ begin
+ execute(statement)
+ log "Index created: #{((Time.now - started_at) * 1000).to_i} ms"
+ rescue PG::LockNotAvailable
+ log "Could not acquire lock: #{index[:table]}"
end
end
end
end
- else
- log "No new indexes found"
end
new_indexes
end
@@ -415,11 +442,11 @@
end
def database_tables
result = execute <<-SQL
SELECT
- table_name
+ table_schema || '.' || table_name AS table_name
FROM
information_schema.tables
WHERE
table_catalog = current_database() AND
table_schema NOT IN ('pg_catalog', 'information_schema')
@@ -437,20 +464,17 @@
INNER JOIN
pg_database ON pg_database.oid = pg_stat_statements.dbid
WHERE
datname = current_database()
AND total_time >= #{@min_time * 60000}
+ AND calls >= #{@min_calls}
ORDER BY
1
SQL
result.map { |q| q["query"] }
end
- def possible_tables(queries)
- Set.new(queries.flat_map(&:tables).uniq & database_tables)
- end
-
def with_advisory_lock
lock_id = 123456
first_time = true
while execute("SELECT pg_try_advisory_lock(#{lock_id})").first["pg_try_advisory_lock"] != "t"
if first_time
@@ -478,30 +502,28 @@
end
def columns(tables)
columns = execute <<-SQL
SELECT
- table_name,
+ table_schema || '.' || table_name AS table_name,
column_name,
data_type
FROM
information_schema.columns
WHERE
- table_schema = 'public' AND
- table_name IN (#{tables.map { |t| quote(t) }.join(", ")})
+ table_schema || '.' || table_name IN (#{tables.map { |t| quote(t) }.join(", ")})
ORDER BY
1, 2
SQL
columns.map { |v| {table: v["table_name"], column: v["column_name"], type: v["data_type"]} }
end
def indexes(tables)
execute(<<-SQL
SELECT
- schemaname AS schema,
- t.relname AS table,
+ schemaname || '.' || t.relname AS table,
ix.relname AS name,
regexp_replace(pg_get_indexdef(i.indexrelid), '^[^\\(]*\\((.*)\\)$', '\\1') AS columns,
regexp_replace(pg_get_indexdef(i.indexrelid), '.* USING ([^ ]*) \\(.*', '\\1') AS using
FROM
pg_index i
@@ -510,30 +532,33 @@
INNER JOIN
pg_class ix ON ix.oid = i.indexrelid
LEFT JOIN
pg_stat_user_indexes ui ON ui.indexrelid = i.indexrelid
WHERE
- t.relname IN (#{tables.map { |t| quote(t) }.join(", ")}) AND
- schemaname IS NOT NULL AND
+ schemaname || '.' || t.relname IN (#{tables.map { |t| quote(t) }.join(", ")}) AND
indisvalid = 't' AND
indexprs IS NULL AND
indpred IS NULL
ORDER BY
1, 2
SQL
).map { |v| v["columns"] = v["columns"].sub(") WHERE (", " WHERE ").split(", ").map { |c| unquote(c) }; v }
end
+ def search_path
+ execute("SHOW search_path")[0]["search_path"].split(",").map(&:strip)
+ end
+
def unquote(part)
- if part && part.start_with?('"')
+ if part && part.start_with?('"') && part.end_with?('"')
part[1..-2]
else
part
end
end
def quote_ident(value)
- conn.quote_ident(value)
+ value.split(".").map { |v| conn.quote_ident(v) }.join(".")
end
def quote(value)
if value.is_a?(String)
"'#{quote_string(value)}'"