lib/dexter.rb in pgdexter-0.1.0 vs lib/dexter.rb in pgdexter-0.1.1

- old
+ new

@@ -2,10 +2,12 @@ require "slop" require "pg" require "pg_query" require "time" require "set" +require "thread" +require "dexter/indexer" require "dexter/log_parser" module Dexter class Client attr_reader :arguments, :options @@ -20,243 +22,29 @@ # get queries queries = [] if options[:s] queries << options[:s] + Indexer.new(self).process_queries(queries) end if arguments[1] begin - parser = LogParser.new(arguments[1], min_time: options[:min_time]) - queries.concat(parser.queries) + LogParser.new(arguments[1], self).perform rescue Errno::ENOENT abort "Log file not found" end end - - # narrow down queries and tables - tables, queries = narrow_queries(queries) - return if tables.empty? - - # get ready for hypothetical indexes - select_all("SET client_min_messages = warning") - select_all("CREATE EXTENSION IF NOT EXISTS hypopg") - select_all("SELECT hypopg_reset()") - - # ensure tables have recently been analyzed - analyze_tables(tables) - - # get initial plans - initial_plans = {} - queries.each do |query| - begin - initial_plans[query] = plan(query) - rescue PG::Error - # do nothing - end + if !options[:s] && !arguments[1] + LogParser.new(STDIN, self).perform end - queries.select! { |q| initial_plans[q] } - - # get existing indexes - index_set = Set.new - indexes(tables).each do |index| - # TODO make sure btree - index_set << [index["table"], index["columns"]] - end - - # create hypothetical indexes - candidates = {} - columns(tables).each do |col| - unless index_set.include?([col[:table], [col[:column]]]) - candidates[col] = select_all("SELECT * FROM hypopg_create_index('CREATE INDEX ON #{col[:table]} (#{[col[:column]].join(", ")})');").first["indexname"] - end - end - - new_indexes = [] - queries.each do |query| - starting_cost = initial_plans[query]["Total Cost"] - plan2 = plan(query) - cost2 = plan2["Total Cost"] - best_indexes = [] - - candidates.each do |col, index_name| - if plan2.inspect.include?(index_name) - best_indexes << { - table: col[:table], - columns: [col[:column]] - } - end - end - - puts query - puts "Starting cost: #{starting_cost}" - puts "Final cost: #{cost2}" - - # must make it 20% faster - if cost2 < starting_cost * 0.8 - new_indexes.concat(best_indexes) - best_indexes.each do |index| - puts "CREATE INDEX CONCURRENTLY ON #{index[:table]} (#{index[:columns].join(", ")});" - end - else - puts "Nope!" - end - puts - end - - # create indexes - if new_indexes.any? - puts "Indexes to be created:" - new_indexes.uniq.sort_by(&:to_a).each do |index| - statement = "CREATE INDEX CONCURRENTLY ON #{index[:table]} (#{index[:columns].join(", ")})" - puts "#{statement};" - select_all(statement) if options[:create] - end - end end - def conn - @conn ||= begin - uri = URI.parse(arguments[0]) - config = { - host: uri.host, - port: uri.port, - dbname: uri.path.sub(/\A\//, ""), - user: uri.user, - password: uri.password, - connect_timeout: 3 - }.reject { |_, value| value.to_s.empty? } - PG::Connection.new(config) - end - rescue PG::ConnectionBad - abort "Bad database url" - end - - def select_all(query) - conn.exec(query).to_a - end - - def plan(query) - JSON.parse(select_all("EXPLAIN (FORMAT JSON) #{query}").first["QUERY PLAN"]).first["Plan"] - end - - def narrow_queries(queries) - result = select_all <<-SQL - SELECT - table_name - FROM - information_schema.tables - WHERE - table_catalog = current_database() AND - table_schema NOT IN ('pg_catalog', 'information_schema') - SQL - possible_tables = Set.new(result.map { |r| r["table_name"] }) - - tables = queries.flat_map { |q| PgQuery.parse(q).tables }.uniq.select { |t| possible_tables.include?(t) } - - [tables, queries.select { |q| PgQuery.parse(q).tables.all? { |t| possible_tables.include?(t) } }] - end - - def columns(tables) - columns = select_all <<-SQL - SELECT - table_name, - column_name - FROM - information_schema.columns - WHERE - table_schema = 'public' AND - table_name IN (#{tables.map { |t| quote(t) }.join(", ")}) - SQL - - columns.map { |v| {table: v["table_name"], column: v["column_name"]} } - end - - def indexes(tables) - select_all(<<-SQL - SELECT - schemaname AS schema, - t.relname AS table, - ix.relname AS name, - regexp_replace(pg_get_indexdef(i.indexrelid), '^[^\\(]*\\((.*)\\)$', '\\1') AS columns, - regexp_replace(pg_get_indexdef(i.indexrelid), '.* USING ([^ ]*) \\(.*', '\\1') AS using, - indisunique AS unique, - indisprimary AS primary, - indisvalid AS valid, - indexprs::text, - indpred::text, - pg_get_indexdef(i.indexrelid) AS definition - FROM - pg_index i - INNER JOIN - pg_class t ON t.oid = i.indrelid - INNER JOIN - pg_class ix ON ix.oid = i.indexrelid - LEFT JOIN - pg_stat_user_indexes ui ON ui.indexrelid = i.indexrelid - WHERE - t.relname IN (#{tables.map { |t| quote(t) }.join(", ")}) AND - schemaname IS NOT NULL AND - indisvalid = 't' AND - indexprs IS NULL AND - indpred IS NULL - ORDER BY - 1, 2 - SQL - ).map { |v| v["columns"] = v["columns"].sub(") WHERE (", " WHERE ").split(", ").map { |c| unquote(c) }; v } - end - - def unquote(part) - if part && part.start_with?('"') - part[1..-2] - else - part - end - end - - def analyze_tables(tables) - analyze_stats = select_all <<-SQL - SELECT - schemaname AS schema, - relname AS table, - last_analyze, - last_autoanalyze - FROM - pg_stat_user_tables - WHERE - relname IN (#{tables.map { |t| quote(t) }.join(", ")}) - SQL - - last_analyzed = {} - analyze_stats.each do |stats| - last_analyzed[stats["table"]] = Time.parse(stats["last_analyze"]) if stats["last_analyze"] - end - - tables.each do |table| - if !last_analyzed[table] || last_analyzed[table] < Time.now - 3600 - puts "Analyzing #{table}" - select_all("ANALYZE #{table}") - end - end - end - - def quote(value) - if value.is_a?(String) - "'#{quote_string(value)}'" - else - value - end - end - - # activerecord - def quote_string(s) - s.gsub(/\\/, '\&\&').gsub(/'/, "''") - end - def parse_args(args) opts = Slop.parse(args) do |o| o.boolean "--create", default: false o.string "-s" o.float "--min-time", default: 0 + o.integer "--interval", default: 60 end [opts.arguments, opts.to_hash] rescue Slop::Error => e abort e.message end