Sha256: 694dd2226b4c8d336fee1411ee377035d1f7caf053188cfefe66a17d1993d2cc

Contents?: true

Size: 1.71 KB

Versions: 1

Compression:

Stored size: 1.71 KB

Contents

module Dexter
  class Processor
    include Logging

    def initialize(logfile, options)
      @logfile = logfile

      @collector = Collector.new(min_time: options[:min_time], min_calls: options[:min_calls])
      @log_parser =
        if options[:input_format] == "csv"
          CsvLogParser.new(logfile, @collector)
        else
          LogParser.new(logfile, @collector)
        end

      @indexer = Indexer.new(options)

      @starting_interval = 3
      @interval = options[:interval]

      @mutex = Mutex.new
      @last_checked_at = {}

      log "Started"
    end

    def perform
      if @logfile == STDIN
        Thread.abort_on_exception = true
        Thread.new do
          sleep(@starting_interval)
          loop do
            begin
              process_queries
            rescue PG::ServerError => e
              log "ERROR: #{e.class.name}: #{e.message}"
            end
            sleep(@interval)
          end
        end
      end

      begin
        @log_parser.perform
      rescue Errno::ENOENT => e
        abort e.message
      end

      process_queries
    end

    private

    def process_queries
      @mutex.synchronize do
        process_queries_without_lock
      end
    end

    def process_queries_without_lock
      now = Time.now
      min_checked_at = now - 3600 # don't recheck for an hour
      queries = []
      @collector.fetch_queries.each do |query|
        if !@last_checked_at[query.fingerprint] || @last_checked_at[query.fingerprint] < min_checked_at
          queries << query
          @last_checked_at[query.fingerprint] = now
        end
      end

      log "Processing #{queries.size} new query fingerprints"
      @indexer.process_queries(queries) if queries.any?
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
pgdexter-0.3.0 lib/dexter/processor.rb