lib/dexter/log_parser.rb in pgdexter-0.1.1 vs lib/dexter/log_parser.rb in pgdexter-0.1.2
- old
+ new
@@ -1,124 +1,55 @@
module Dexter
class LogParser
REGEX = /duration: (\d+\.\d+) ms (statement|execute <unnamed>): (.+)/
+ LINE_SEPERATOR = ": ".freeze
- def initialize(logfile, client)
+ def initialize(logfile, collector)
@logfile = logfile
- @min_time = client.options[:min_time] * 60000 # convert minutes to ms
- @top_queries = {}
- @indexer = Indexer.new(client)
- @new_queries = Set.new
- @new_queries_mutex = Mutex.new
- @process_queries_mutex = Mutex.new
- @last_checked_at = {}
-
- log "Started"
-
- if @logfile == STDIN
- Thread.abort_on_exception = true
-
- @timer_thread = Thread.new do
- sleep(3) # starting sleep
- loop do
- @process_queries_mutex.synchronize do
- process_queries
- end
- sleep(client.options[:interval])
- end
- end
- end
+ @collector = collector
end
def perform
active_line = nil
duration = nil
each_line do |line|
if active_line
- if line.include?(": ")
+ if line.include?(LINE_SEPERATOR)
process_entry(active_line, duration)
active_line = nil
- duration = nil
else
active_line << line
end
end
if !active_line && m = REGEX.match(line.chomp)
duration = m[1].to_f
active_line = m[3]
- else
- # skip
end
end
process_entry(active_line, duration) if active_line
-
- @process_queries_mutex.synchronize do
- process_queries
- end
end
private
def each_line
if @logfile == STDIN
STDIN.each_line do |line|
yield line
end
else
- File.foreach(@logfile) do |line|
- yield line
- end
- end
- end
-
- def process_entry(query, duration)
- return unless query =~ /SELECT/i
- fingerprint =
begin
- PgQuery.fingerprint(query)
- rescue PgQuery::ParseError
- # do nothing
+ File.foreach(@logfile) do |line|
+ yield line
+ end
+ rescue Errno::ENOENT
+ abort "Log file not found"
end
- return unless fingerprint
-
- @top_queries[fingerprint] ||= {calls: 0, total_time: 0}
- @top_queries[fingerprint][:calls] += 1
- @top_queries[fingerprint][:total_time] += duration
- @top_queries[fingerprint][:query] = query
- @new_queries_mutex.synchronize do
- @new_queries << fingerprint
end
end
- def process_queries
- new_queries = nil
-
- @new_queries_mutex.synchronize do
- new_queries = @new_queries.dup
- @new_queries.clear
- end
-
- now = Time.now
- min_checked_at = now - 3600 # don't recheck for an hour
- queries = []
- fingerprints = {}
- @top_queries.each do |k, v|
- if new_queries.include?(k) && v[:total_time] > @min_time && (!@last_checked_at[k] || @last_checked_at[k] < min_checked_at)
- fingerprints[v[:query]] = k
- queries << v[:query]
- @last_checked_at[k] = now
- end
- end
-
- log "Processing #{queries.size} new query fingerprints"
- if queries.any?
- @indexer.process_queries(queries)
- end
- end
-
- def log(message)
- puts "#{Time.now.iso8601} #{message}"
+ def process_entry(query, duration)
+ @collector.add(query, duration) if query =~ /SELECT/i
end
end
end