lib/dexter/indexer.rb in pgdexter-0.1.4 vs lib/dexter/indexer.rb in pgdexter-0.1.5

- old
+ new

@@ -7,14 +7,21 @@ @create = options[:create] @log_level = options[:log_level] @exclude_tables = options[:exclude] @log_sql = options[:log_sql] @log_explain = options[:log_explain] + @min_time = options[:min_time] || 0 - create_extension + create_extension unless extension_exists? end + def process_stat_statements + queries = stat_statements.map { |q| Query.new(q) }.sort_by(&:fingerprint).group_by(&:fingerprint).map { |_, v| v.first } + log "Processing #{queries.size} new query fingerprints" + process_queries(queries) + end + def process_queries(queries) # reset hypothetical indexes reset_hypothetical_indexes # filter queries from other databases and system tables @@ -44,13 +51,21 @@ private def create_extension execute("SET client_min_messages = warning") - execute("CREATE EXTENSION IF NOT EXISTS hypopg") + begin + execute("CREATE EXTENSION IF NOT EXISTS hypopg") + rescue PG::InsufficientPrivilege + abort "Use a superuser to run: CREATE EXTENSION hypopg" + end end + def extension_exists? + execute("SELECT * FROM pg_available_extensions WHERE name = 'hypopg' AND installed_version IS NOT NULL").any? + end + def reset_hypothetical_indexes execute("SELECT hypopg_reset()") end def analyze_tables(tables) @@ -206,21 +221,24 @@ end end end if @create - # TODO use advisory locks # 1. create lock # 2. refresh existing index list # 3. create indexes that still don't exist # 4. release lock - new_indexes.each do |index| - statement = "CREATE INDEX CONCURRENTLY ON #{quote_ident(index[:table])} (#{index[:columns].map { |c| quote_ident(c) }.join(", ")})" - log "Creating index: #{statement}" - started_at = Time.now - execute(statement) - log "Index created: #{((Time.now - started_at) * 1000).to_i} ms" + with_advisory_lock do + new_indexes.each do |index| + unless index_exists?(index) + statement = "CREATE INDEX CONCURRENTLY ON #{quote_ident(index[:table])} (#{index[:columns].map { |c| quote_ident(c) }.join(", ")})" + log "Creating index: #{statement}" + started_at = Time.now + execute(statement) + log "Index created: #{((Time.now - started_at) * 1000).to_i} ms" + end + end end end else log "No new indexes found" end @@ -240,11 +258,11 @@ connect_timeout: 3 }.reject { |_, value| value.to_s.empty? } PG::Connection.new(config) end rescue PG::ConnectionBad - abort "Bad database url" + abort "Can't connect to database" end def execute(query) # use exec_params instead of exec for security # @@ -282,15 +300,61 @@ FROM information_schema.tables WHERE table_catalog = current_database() AND table_schema NOT IN ('pg_catalog', 'information_schema') + AND table_type = 'BASE TABLE' SQL result.map { |r| r["table_name"] } end + def stat_statements + result = execute <<-SQL + SELECT + DISTINCT query + FROM + pg_stat_statements + INNER JOIN + pg_database ON pg_database.oid = pg_stat_statements.dbid + WHERE + datname = current_database() + AND total_time >= #{@min_time * 60000} + ORDER BY + 1 + SQL + result.map { |q| q["query"] } + end + def possible_tables(queries) Set.new(queries.flat_map(&:tables).uniq & database_tables) + end + + def with_advisory_lock + lock_id = 123456 + first_time = true + while execute("SELECT pg_try_advisory_lock(#{lock_id})").first["pg_try_advisory_lock"] != "t" + if first_time + log "Waiting for lock..." + first_time = false + end + sleep(1) + end + yield + ensure + with_min_messages("error") do + execute("SELECT pg_advisory_unlock(#{lock_id})") + end + end + + def with_min_messages(value) + execute("SET client_min_messages = #{quote(value)}") + yield + ensure + execute("SET client_min_messages = warning") + end + + def index_exists?(index) + indexes([index[:table]]).find { |i| i["columns"] == index[:columns] } end def columns(tables) columns = execute <<-SQL SELECT