module Dexter class Processor include Logging def initialize(logfile, options) @logfile = logfile @collector = Collector.new(min_time: options[:min_time], min_calls: options[:min_calls]) @indexer = Indexer.new(options) @log_parser = if @logfile == :pg_stat_activity PgStatActivityParser.new(@indexer, @collector) elsif @logfile == :log_table if options[:input_format] == "csv" CsvLogTableParser.new(@indexer, @collector) else StderrLogTableParser.new(@indexer, @collector) end elsif options[:input_format] == "csv" CsvLogParser.new(logfile, @collector) elsif options[:input_format] == "json" JsonLogParser.new(logfile, @collector) elsif options[:input_format] == "sql" SqlLogParser.new(logfile, @collector) else StderrLogParser.new(logfile, @collector) end @starting_interval = 3 @interval = options[:interval] @log_parser.once = options[:once] @mutex = Mutex.new @last_checked_at = {} log "Started" end def perform if [STDIN, :pg_stat_activity, :log_table].include?(@logfile) && !@log_parser.once Thread.abort_on_exception = true Thread.new do sleep(@starting_interval) loop do begin process_queries rescue PG::ServerError => e log colorize("ERROR: #{e.class.name}: #{e.message}", :red) end sleep(@interval) end end end begin @log_parser.perform rescue Errno::ENOENT => e raise Dexter::Abort, "ERROR: #{e.message}" end process_queries end private def process_queries @mutex.synchronize do process_queries_without_lock end end def process_queries_without_lock now = Process.clock_gettime(Process::CLOCK_MONOTONIC) min_checked_at = now - 3600 # don't recheck for an hour queries = [] @collector.fetch_queries.each do |query| if !@last_checked_at[query.fingerprint] || @last_checked_at[query.fingerprint] < min_checked_at queries << query @last_checked_at[query.fingerprint] = now end end log "Processing #{queries.size} new query fingerprints" @indexer.process_queries(queries) if queries.any? end end end