lib/dexter/indexer.rb in pgdexter-0.1.4 vs lib/dexter/indexer.rb in pgdexter-0.1.5
- old
+ new
@@ -7,14 +7,21 @@
@create = options[:create]
@log_level = options[:log_level]
@exclude_tables = options[:exclude]
@log_sql = options[:log_sql]
@log_explain = options[:log_explain]
+ @min_time = options[:min_time] || 0
- create_extension
+ create_extension unless extension_exists?
end
+ def process_stat_statements
+ queries = stat_statements.map { |q| Query.new(q) }.sort_by(&:fingerprint).group_by(&:fingerprint).map { |_, v| v.first }
+ log "Processing #{queries.size} new query fingerprints"
+ process_queries(queries)
+ end
+
def process_queries(queries)
# reset hypothetical indexes
reset_hypothetical_indexes
# filter queries from other databases and system tables
@@ -44,13 +51,21 @@
private
def create_extension
execute("SET client_min_messages = warning")
- execute("CREATE EXTENSION IF NOT EXISTS hypopg")
+ begin
+ execute("CREATE EXTENSION IF NOT EXISTS hypopg")
+ rescue PG::InsufficientPrivilege
+ abort "Use a superuser to run: CREATE EXTENSION hypopg"
+ end
end
+ def extension_exists?
+ execute("SELECT * FROM pg_available_extensions WHERE name = 'hypopg' AND installed_version IS NOT NULL").any?
+ end
+
def reset_hypothetical_indexes
execute("SELECT hypopg_reset()")
end
def analyze_tables(tables)
@@ -206,21 +221,24 @@
end
end
end
if @create
- # TODO use advisory locks
# 1. create lock
# 2. refresh existing index list
# 3. create indexes that still don't exist
# 4. release lock
- new_indexes.each do |index|
- statement = "CREATE INDEX CONCURRENTLY ON #{quote_ident(index[:table])} (#{index[:columns].map { |c| quote_ident(c) }.join(", ")})"
- log "Creating index: #{statement}"
- started_at = Time.now
- execute(statement)
- log "Index created: #{((Time.now - started_at) * 1000).to_i} ms"
+ with_advisory_lock do
+ new_indexes.each do |index|
+ unless index_exists?(index)
+ statement = "CREATE INDEX CONCURRENTLY ON #{quote_ident(index[:table])} (#{index[:columns].map { |c| quote_ident(c) }.join(", ")})"
+ log "Creating index: #{statement}"
+ started_at = Time.now
+ execute(statement)
+ log "Index created: #{((Time.now - started_at) * 1000).to_i} ms"
+ end
+ end
end
end
else
log "No new indexes found"
end
@@ -240,11 +258,11 @@
connect_timeout: 3
}.reject { |_, value| value.to_s.empty? }
PG::Connection.new(config)
end
rescue PG::ConnectionBad
- abort "Bad database url"
+ abort "Can't connect to database"
end
def execute(query)
# use exec_params instead of exec for security
#
@@ -282,15 +300,61 @@
FROM
information_schema.tables
WHERE
table_catalog = current_database() AND
table_schema NOT IN ('pg_catalog', 'information_schema')
+ AND table_type = 'BASE TABLE'
SQL
result.map { |r| r["table_name"] }
end
+ def stat_statements
+ result = execute <<-SQL
+ SELECT
+ DISTINCT query
+ FROM
+ pg_stat_statements
+ INNER JOIN
+ pg_database ON pg_database.oid = pg_stat_statements.dbid
+ WHERE
+ datname = current_database()
+ AND total_time >= #{@min_time * 60000}
+ ORDER BY
+ 1
+ SQL
+ result.map { |q| q["query"] }
+ end
+
def possible_tables(queries)
Set.new(queries.flat_map(&:tables).uniq & database_tables)
+ end
+
+ def with_advisory_lock
+ lock_id = 123456
+ first_time = true
+ while execute("SELECT pg_try_advisory_lock(#{lock_id})").first["pg_try_advisory_lock"] != "t"
+ if first_time
+ log "Waiting for lock..."
+ first_time = false
+ end
+ sleep(1)
+ end
+ yield
+ ensure
+ with_min_messages("error") do
+ execute("SELECT pg_advisory_unlock(#{lock_id})")
+ end
+ end
+
+ def with_min_messages(value)
+ execute("SET client_min_messages = #{quote(value)}")
+ yield
+ ensure
+ execute("SET client_min_messages = warning")
+ end
+
+ def index_exists?(index)
+ indexes([index[:table]]).find { |i| i["columns"] == index[:columns] }
end
def columns(tables)
columns = execute <<-SQL
SELECT