Sha256: face1a85a46c6b14b13966f5fa8ea57b18fedcdf1cbd3cb255532d317aec5ad6

Contents?: true

Size: 1.38 KB

Versions: 4

Compression:

Stored size: 1.38 KB

Contents

module Dexter
  class Processor
    include Logging

    def initialize(database_url, logfile, options)
      log "Started"

      @logfile = logfile

      @collector = Collector.new(min_time: options[:min_time])
      @log_parser = LogParser.new(logfile, @collector)
      @indexer = Indexer.new(database_url, options)

      @starting_interval = 3
      @interval = options[:interval]

      @mutex = Mutex.new
      @last_checked_at = {}
    end

    def perform
      if @logfile == STDIN
        Thread.abort_on_exception = true
        Thread.new do
          sleep(@starting_interval)
          loop do
            process_queries
            sleep(@interval)
          end
        end
      end

      @log_parser.perform

      process_queries
    end

    private

    def process_queries
      @mutex.synchronize do
        process_queries_without_lock
      end
    end

    def process_queries_without_lock
      now = Time.now
      min_checked_at = now - 3600 # don't recheck for an hour
      queries = []
      @collector.fetch_queries.each do |query|
        if !@last_checked_at[query.fingerprint] || @last_checked_at[query.fingerprint] < min_checked_at
          queries << query
          @last_checked_at[query.fingerprint] = now
        end
      end

      log "Processing #{queries.size} new query fingerprints"
      @indexer.process_queries(queries) if queries.any?
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
pgdexter-0.1.5 lib/dexter/processor.rb
pgdexter-0.1.4 lib/dexter/processor.rb
pgdexter-0.1.3 lib/dexter/processor.rb
pgdexter-0.1.2 lib/dexter/processor.rb