module RequestLogAnalyzer::Source
# The LogParser class reads log data from a given source and uses a file format definition
# to parse all relevent information about requests from the file. A FileFormat module should
# be provided that contains the definitions of the lines that occur in the log data.
#
# De order in which lines occur is used to combine lines to a single request. If these lines
# are mixed, requests cannot be combined properly. This can be the case if data is written to
# the log file simultaneously by different mongrel processes. This problem is detected by the
# parser. It will emit warnings when this occurs. LogParser supports multiple parse strategies
# that deal differently with this problem.
class LogParser < Base
include Enumerable
# The maximum number of bytes to read from a line.
DEFAULT_MAX_LINE_LENGTH = 8096
DEFAULT_LINE_DIVIDER = "\n"
# The default parse strategy that will be used to parse the input.
DEFAULT_PARSE_STRATEGY = 'assume-correct'
# All available parse strategies.
PARSE_STRATEGIES = ['cautious', 'assume-correct']
attr_reader :source_files, :current_file, :current_lineno, :processed_files
attr_reader :warnings, :parsed_lines, :parsed_requests, :skipped_lines, :skipped_requests
# Initializes the log file parser instance.
# It will apply the language specific FileFormat module to this instance. It will use the line
# definitions in this module to parse any input that it is given (see parse_io).
#
# format:: The current file format instance
# options:: A hash of options that are used by the parser
def initialize(format, options = {})
super(format, options)
@warnings = 0
@parsed_lines = 0
@parsed_requests = 0
@skipped_lines = 0
@skipped_requests = 0
@current_request = nil
@current_source = nil
@current_file = nil
@current_lineno = nil
@processed_files = []
@source_files = options[:source_files]
@progress_handler = nil
@warning_handler = nil
@options[:parse_strategy] ||= DEFAULT_PARSE_STRATEGY
unless PARSE_STRATEGIES.include?(@options[:parse_strategy])
raise "Unknown parse strategy: #{@options[@parse_strategy]}"
end
end
def max_line_length
file_format.max_line_length || DEFAULT_MAX_LINE_LENGTH
end
def line_divider
file_format.line_divider || DEFAULT_LINE_DIVIDER
end
# Reads the input, which can either be a file, sequence of files or STDIN to parse
# lines specified in the FileFormat. This lines will be combined into Request instances,
# that will be yielded. The actual parsing occurs in the parse_io method.
# options:: A Hash of options that will be pased to parse_io.
def each_request(options = {}, &block) # :yields: :request, request
case @source_files
when IO
if @source_files == $stdin
puts "Parsing from the standard input. Press CTRL+C to finish." # FIXME: not here
end
parse_stream(@source_files, options, &block)
when String
parse_file(@source_files, options, &block)
when Array
parse_files(@source_files, options, &block)
else
raise "Unknown source provided"
end
end
# Make sure the Enumerable methods work as expected
alias_method :each, :each_request
# Parses a list of subsequent files of the same format, by calling parse_file for every
# file in the array.
# files:: The Array of files that should be parsed
# options:: A Hash of options that will be pased to parse_io.
def parse_files(files, options = {}, &block) # :yields: request
files.each { |file| parse_file(file, options, &block) }
end
# Check if a file has a compressed extention in the filename.
# If recognized, return the command string used to decompress the file
def decompress_file?(filename)
nice_command = "nice -n 5"
return "#{nice_command} gunzip -c -d #{filename}" if filename.match(/\.tar.gz$/) || filename.match(/\.tgz$/) || filename.match(/\.gz$/)
return "#{nice_command} bunzip2 -c -d #{filename}" if filename.match(/\.bz2$/)
return "#{nice_command} unzip -p #{filename}" if filename.match(/\.zip$/)
return ""
end
# Parses a log file. Creates an IO stream for the provided file, and sends it to parse_io for
# further handling. This method supports progress updates that can be used to display a progressbar
#
# If the logfile is compressed, it is uncompressed to stdout and read.
# TODO: Check if IO.popen encounters problems with the given command line.
# TODO: Fix progress bar that is broken for IO.popen, as it returns a single string.
#
# file:: The file that should be parsed.
# options:: A Hash of options that will be pased to parse_io.
def parse_file(file, options = {}, &block)
if File.directory?(file)
parse_files(Dir["#{ file }/*"], options, &block)
return
end
@current_source = File.expand_path(file)
@source_changes_handler.call(:started, @current_source) if @source_changes_handler
if decompress_file?(file).empty?
@progress_handler = @dormant_progress_handler
@progress_handler.call(:started, file) if @progress_handler
File.open(file, 'rb') { |f| parse_io(f, options, &block) }
@progress_handler.call(:finished, file) if @progress_handler
@progress_handler = nil
@processed_files.push(@current_source.dup)
else
IO.popen(decompress_file?(file), 'rb') { |f| parse_io(f, options, &block) }
end
@source_changes_handler.call(:finished, @current_source) if @source_changes_handler
@current_source = nil
end
# Parses an IO stream. It will simply call parse_io. This function does not support progress updates
# because the length of a stream is not known.
# stream:: The IO stream that should be parsed.
# options:: A Hash of options that will be pased to parse_io.
def parse_stream(stream, options = {}, &block)
parse_io(stream, options, &block)
end
# Parses a string. It will simply call parse_io. This function does not support progress updates.
# string:: The string that should be parsed.
# options:: A Hash of options that will be pased to parse_io.
def parse_string(string, options = {}, &block)
parse_io(StringIO.new(string), options, &block)
end
# This method loops over each line of the input stream. It will try to parse this line as any of
# the lines that are defined by the current file format (see RequestLogAnalyazer::FileFormat).
# It will then combine these parsed line into requests using heuristics. These requests (see
# RequestLogAnalyzer::Request) will then be yielded for further processing in the pipeline.
#
# - RequestLogAnalyzer::LineDefinition#matches is called to test if a line matches a line definition of the file format.
# - update_current_request is used to combine parsed lines into requests using heuristics.
# - The method will yield progress updates if a progress handler is installed using progress=
# - The method will yield parse warnings if a warning handler is installed using warning=
#
# This is a Ruby 1.9 specific version that offers memory protection.
#
# io:: The IO instance to use as source
# options:: A hash of options that can be used by the parser.
def parse_io_19(io, options = {}, &block) # :yields: request
@max_line_length = options[:max_line_length] || max_line_length
@line_divider = options[:line_divider] || line_divider
@current_lineno = 0
while line = io.gets(@line_divider, @max_line_length)
@current_lineno += 1
@progress_handler.call(:progress, io.pos) if @progress_handler && @current_lineno % 255 == 0
parse_line(line, &block)
end
warn(:unfinished_request_on_eof, "End of file reached, but last request was not completed!") unless @current_request.nil?
@current_lineno = nil
end
# This method loops over each line of the input stream. It will try to parse this line as any of
# the lines that are defined by the current file format (see RequestLogAnalyazer::FileFormat).
# It will then combine these parsed line into requests using heuristics. These requests (see
# RequestLogAnalyzer::Request) will then be yielded for further processing in the pipeline.
#
# - RequestLogAnalyzer::LineDefinition#matches is called to test if a line matches a line definition of the file format.
# - update_current_request is used to combine parsed lines into requests using heuristics.
# - The method will yield progress updates if a progress handler is installed using progress=
# - The method will yield parse warnings if a warning handler is installed using warning=
#
# This is a Ruby 1.8 specific version that doesn't offer memory protection.
#
# io:: The IO instance to use as source
# options:: A hash of options that can be used by the parser.
def parse_io_18(io, options = {}, &block) # :yields: request
@line_divider = options[:line_divider] || line_divider
@current_lineno = 0
while line = io.gets(@line_divider)
@current_lineno += 1
@progress_handler.call(:progress, io.pos) if @progress_handler && @current_lineno % 255 == 0
parse_line(line, &block)
end
warn(:unfinished_request_on_eof, "End of file reached, but last request was not completed!") unless @current_request.nil?
@current_lineno = nil
end
alias_method :parse_io, RUBY_VERSION.to_f < 1.9 ? :parse_io_18 : :parse_io_19
# Parses a single line using the current file format. If successful, use the parsed
# information to build a request
# line:: The line to parse
# block:: The block to send fully parsed requests to.
def parse_line(line, &block) # :yields: request
if request_data = file_format.parse_line(line) { |wt, message| warn(wt, message) }
@parsed_lines += 1
update_current_request(request_data.merge(:source => @current_source, :lineno => @current_lineno), &block)
end
end
# Add a block to this method to install a progress handler while parsing.
# proc:: The proc that will be called to handle progress update messages
def progress=(proc)
@dormant_progress_handler = proc
end
# Add a block to this method to install a warning handler while parsing,
# proc:: The proc that will be called to handle parse warning messages
def warning=(proc)
@warning_handler = proc
end
# Add a block to this method to install a source change handler while parsing,
# proc:: The proc that will be called to handle source changes
def source_changes=(proc)
@source_changes_handler = proc
end
# This method is called by the parser if it encounteres any parsing problems.
# It will call the installed warning handler if any.
#
# By default, RequestLogAnalyzer::Controller will install a warning handler
# that will pass the warnings to each aggregator so they can do something useful
# with it.
#
# type:: The warning type (a Symbol)
# message:: A message explaining the warning
def warn(type, message)
@warnings += 1
@warning_handler.call(type, message, @current_lineno) if @warning_handler
end
protected
# Combines the different lines of a request into a single Request object. It will start a
# new request when a header line is encountered en will emit the request when a footer line
# is encountered.
#
# Combining the lines is done using heuristics. Problems can occur in this process. The
# current parse strategy defines how these cases are handled.
#
# When using the 'assume-correct' parse strategy (default):
# - Every line that is parsed before a header line is ignored as it cannot be included in
# any request. It will emit a :no_current_request warning.
# - If a header line is found before the previous requests was closed, the previous request
# will be yielded and a new request will be started.
#
# When using the 'cautious' parse strategy:
# - Every line that is parsed before a header line is ignored as it cannot be included in
# any request. It will emit a :no_current_request warning.
# - A header line that is parsed before a request is closed by a footer line, is a sign of
# an unproperly ordered file. All data that is gathered for the request until then is
# discarded and the next request is ignored as well. An :unclosed_request warning is
# emitted.
#
# request_data:: A hash of data that was parsed from the last line.
def update_current_request(request_data, &block) # :yields: request
if alternative_header_line?(request_data)
if @current_request
@current_request << request_data
else
@current_request = @file_format.request(request_data)
end
elsif header_line?(request_data)
if @current_request
case options[:parse_strategy]
when 'assume-correct'
handle_request(@current_request, &block)
@current_request = @file_format.request(request_data)
when 'cautious'
@skipped_lines += 1
warn(:unclosed_request, "Encountered header line (#{request_data[:line_definition].name.inspect}), but previous request was not closed!")
@current_request = nil # remove all data that was parsed, skip next request as well.
end
elsif footer_line?(request_data)
handle_request(@file_format.request(request_data), &block)
else
@current_request = @file_format.request(request_data)
end
else
if @current_request
@current_request << request_data
if footer_line?(request_data)
handle_request(@current_request, &block) # yield @current_request
@current_request = nil
end
else
@skipped_lines += 1
warn(:no_current_request, "Parseable line (#{request_data[:line_definition].name.inspect}) found outside of a request!")
end
end
end
# Handles the parsed request by sending it into the pipeline.
#
# - It will call RequestLogAnalyzer::Request#validate on the request instance
# - It will send the request into the pipeline, checking whether it was accepted by all the filters.
# - It will update the parsed_requests and skipped_requests variables accordingly
#
# request:: The parsed request instance (RequestLogAnalyzer::Request)
def handle_request(request, &block) # :yields: :request, request
@parsed_requests += 1
request.validate
accepted = block_given? ? yield(request) : true
@skipped_requests += 1 unless accepted
end
# Checks whether a given line hash is an alternative header line according to the current file format.
# hash:: A hash of data that was parsed from the line.
def alternative_header_line?(hash)
hash[:line_definition].header == :alternative
end
# Checks whether a given line hash is a header line according to the current file format.
# hash:: A hash of data that was parsed from the line.
def header_line?(hash)
hash[:line_definition].header == true
end
# Checks whether a given line hash is a footer line according to the current file format.
# hash:: A hash of data that was parsed from the line.
def footer_line?(hash)
hash[:line_definition].footer
end
end
end