lib/dexter/log_parser.rb in pgdexter-0.1.0 vs lib/dexter/log_parser.rb in pgdexter-0.1.1
- old
+ new
@@ -1,21 +1,41 @@
module Dexter
class LogParser
REGEX = /duration: (\d+\.\d+) ms (statement|execute <unnamed>): (.+)/
- def initialize(logfile, options = {})
+ def initialize(logfile, client)
@logfile = logfile
- @min_time = options[:min_time] * 60000 # convert minutes to ms
- end
-
- def queries
+ @min_time = client.options[:min_time] * 60000 # convert minutes to ms
@top_queries = {}
+ @indexer = Indexer.new(client)
+ @new_queries = Set.new
+ @new_queries_mutex = Mutex.new
+ @process_queries_mutex = Mutex.new
+ @last_checked_at = {}
+ log "Started"
+
+ if @logfile == STDIN
+ Thread.abort_on_exception = true
+
+ @timer_thread = Thread.new do
+ sleep(3) # starting sleep
+ loop do
+ @process_queries_mutex.synchronize do
+ process_queries
+ end
+ sleep(client.options[:interval])
+ end
+ end
+ end
+ end
+
+ def perform
active_line = nil
duration = nil
- File.foreach(@logfile) do |line|
+ each_line do |line|
if active_line
if line.include?(": ")
process_entry(active_line, duration)
active_line = nil
duration = nil
@@ -31,19 +51,74 @@
# skip
end
end
process_entry(active_line, duration) if active_line
- @top_queries.select { |_, v| v[:total_time] > @min_time }.map { |_, v| v[:query] }
+ @process_queries_mutex.synchronize do
+ process_queries
+ end
end
private
+ def each_line
+ if @logfile == STDIN
+ STDIN.each_line do |line|
+ yield line
+ end
+ else
+ File.foreach(@logfile) do |line|
+ yield line
+ end
+ end
+ end
+
def process_entry(query, duration)
return unless query =~ /SELECT/i
- fingerprint = PgQuery.fingerprint(query)
- @top_queries[fingerprint] ||= {calls: 0, total_time: 0, query: query}
+ fingerprint =
+ begin
+ PgQuery.fingerprint(query)
+ rescue PgQuery::ParseError
+ # do nothing
+ end
+ return unless fingerprint
+
+ @top_queries[fingerprint] ||= {calls: 0, total_time: 0}
@top_queries[fingerprint][:calls] += 1
@top_queries[fingerprint][:total_time] += duration
+ @top_queries[fingerprint][:query] = query
+ @new_queries_mutex.synchronize do
+ @new_queries << fingerprint
+ end
+ end
+
+ def process_queries
+ new_queries = nil
+
+ @new_queries_mutex.synchronize do
+ new_queries = @new_queries.dup
+ @new_queries.clear
+ end
+
+ now = Time.now
+ min_checked_at = now - 3600 # don't recheck for an hour
+ queries = []
+ fingerprints = {}
+ @top_queries.each do |k, v|
+ if new_queries.include?(k) && v[:total_time] > @min_time && (!@last_checked_at[k] || @last_checked_at[k] < min_checked_at)
+ fingerprints[v[:query]] = k
+ queries << v[:query]
+ @last_checked_at[k] = now
+ end
+ end
+
+ log "Processing #{queries.size} new query fingerprints"
+ if queries.any?
+ @indexer.process_queries(queries)
+ end
+ end
+
+ def log(message)
+ puts "#{Time.now.iso8601} #{message}"
end
end
end