module RequestLogAnalyzer::Source # The LogParser class reads log data from a given source and uses a file format definition # to parse all relevent information about requests from the file. A FileFormat module should # be provided that contains the definitions of the lines that occur in the log data. # # De order in which lines occur is used to combine lines to a single request. If these lines # are mixed, requests cannot be combined properly. This can be the case if data is written to # the log file simultaneously by different mongrel processes. This problem is detected by the # parser. It will emit warnings when this occurs. LogParser supports multiple parse strategies # that deal differently with this problem. class LogParser < Base include Enumerable # The default parse strategy that will be used to parse the input. DEFAULT_PARSE_STRATEGY = 'assume-correct' # All available parse strategies. PARSE_STRATEGIES = ['cautious', 'assume-correct'] attr_reader :source_files, :current_file, :current_lineno # Initializes the log file parser instance. # It will apply the language specific FileFormat module to this instance. It will use the line # definitions in this module to parse any input that it is given (see parse_io). # # format:: The current file format instance # options:: A hash of options that are used by the parser def initialize(format, options = {}) super(format, options) @parsed_lines = 0 @parsed_requests = 0 @skipped_lines = 0 @skipped_requests = 0 @current_request = nil @current_source = nil @current_file = nil @current_lineno = nil @source_files = options[:source_files] @progress_handler = nil @options[:parse_strategy] ||= DEFAULT_PARSE_STRATEGY raise "Unknown parse strategy" unless PARSE_STRATEGIES.include?(@options[:parse_strategy]) end # Reads the input, which can either be a file, sequence of files or STDIN to parse # lines specified in the FileFormat. This lines will be combined into Request instances, # that will be yielded. The actual parsing occurs in the parse_io method. # options:: A Hash of options that will be pased to parse_io. def each_request(options = {}, &block) # :yields: :request, request case @source_files when IO if @source_files == $stdin puts "Parsing from the standard input. Press CTRL+C to finish." # FIXME: not here end parse_stream(@source_files, options, &block) when String parse_file(@source_files, options, &block) when Array parse_files(@source_files, options, &block) else raise "Unknown source provided" end end # Make sure the Enumerable methods work as expected alias_method :each, :each_request # Parses a list of subsequent files of the same format, by calling parse_file for every # file in the array. # files:: The Array of files that should be parsed # options:: A Hash of options that will be pased to parse_io. def parse_files(files, options = {}, &block) # :yields: request files.each { |file| parse_file(file, options, &block) } end # Check if a file has a compressed extention in the filename. # If recognized, return the command string used to decompress the file def decompress_file?(filename) nice_command = "nice -n 5" return "#{nice_command} gunzip -c -d #{filename}" if filename.match(/\.tar.gz$/) || filename.match(/\.tgz$/) || filename.match(/\.gz$/) return "#{nice_command} bunzip2 -c -d #{filename}" if filename.match(/\.bz2$/) return "#{nice_command} unzip -p #{filename}" if filename.match(/\.zip$/) return "" end # Parses a log file. Creates an IO stream for the provided file, and sends it to parse_io for # further handling. This method supports progress updates that can be used to display a progressbar # # If the logfile is compressed, it is uncompressed to stdout and read. # TODO: Check if IO.popen encounters problems with the given command line. # TODO: Fix progress bar that is broken for IO.popen, as it returns a single string. # # file:: The file that should be parsed. # options:: A Hash of options that will be pased to parse_io. def parse_file(file, options = {}, &block) @current_source = File.expand_path(file) @source_changes_handler.call(:started, @current_source) if @source_changes_handler if decompress_file?(file).empty? @progress_handler = @dormant_progress_handler @progress_handler.call(:started, file) if @progress_handler File.open(file, 'r') { |f| parse_io(f, options, &block) } @progress_handler.call(:finished, file) if @progress_handler @progress_handler = nil else IO.popen(decompress_file?(file), 'r') { |f| parse_io(f, options, &block) } end @source_changes_handler.call(:finished, @current_source) if @source_changes_handler @current_source = nil end # Parses an IO stream. It will simply call parse_io. This function does not support progress updates # because the length of a stream is not known. # stream:: The IO stream that should be parsed. # options:: A Hash of options that will be pased to parse_io. def parse_stream(stream, options = {}, &block) parse_io(stream, options, &block) end # This method loops over each line of the input stream. It will try to parse this line as any of # the lines that are defined by the current file format (see RequestLogAnalyazer::FileFormat). # It will then combine these parsed line into requests using heuristics. These requests (see # RequestLogAnalyzer::Request) will then be yielded for further processing in the pipeline. # # - RequestLogAnalyzer::LineDefinition#matches is called to test if a line matches a line definition of the file format. # - update_current_request is used to combine parsed lines into requests using heuristics. # - The method will yield progress updates if a progress handler is installed using progress= # - The method will yield parse warnings if a warning handler is installed using warning= # # io:: The IO instance to use as source # options:: A hash of options that can be used by the parser. def parse_io(io, options = {}, &block) # :yields: request @current_lineno = 1 while line = io.gets @progress_handler.call(:progress, io.pos) if @progress_handler && @current_lineno % 255 == 0 if request_data = file_format.parse_line(line) { |wt, message| warn(wt, message) } @parsed_lines += 1 update_current_request(request_data.merge(:source => @current_source, :lineno => @current_lineno), &block) end @current_lineno += 1 end warn(:unfinished_request_on_eof, "End of file reached, but last request was not completed!") unless @current_request.nil? @current_lineno = nil end # Add a block to this method to install a progress handler while parsing. # proc:: The proc that will be called to handle progress update messages def progress=(proc) @dormant_progress_handler = proc end # Add a block to this method to install a warning handler while parsing, # proc:: The proc that will be called to handle parse warning messages def warning=(proc) @warning_handler = proc end # Add a block to this method to install a source change handler while parsing, # proc:: The proc that will be called to handle source changes def source_changes=(proc) @source_changes_handler = proc end # This method is called by the parser if it encounteres any parsing problems. # It will call the installed warning handler if any. # # By default, RequestLogAnalyzer::Controller will install a warning handler # that will pass the warnings to each aggregator so they can do something useful # with it. # # type:: The warning type (a Symbol) # message:: A message explaining the warning def warn(type, message) @warning_handler.call(type, message, @current_lineno) if @warning_handler end protected # Combines the different lines of a request into a single Request object. It will start a # new request when a header line is encountered en will emit the request when a footer line # is encountered. # # Combining the lines is done using heuristics. Problems can occur in this process. The # current parse strategy defines how these cases are handled. # # When using the 'assume-correct' parse strategy (default): # - Every line that is parsed before a header line is ignored as it cannot be included in # any request. It will emit a :no_current_request warning. # - If a header line is found before the previous requests was closed, the previous request # will be yielded and a new request will be started. # # When using the 'cautious' parse strategy: # - Every line that is parsed before a header line is ignored as it cannot be included in # any request. It will emit a :no_current_request warning. # - A header line that is parsed before a request is closed by a footer line, is a sign of # an unproperly ordered file. All data that is gathered for the request until then is # discarded and the next request is ignored as well. An :unclosed_request warning is # emitted. # # request_data:: A hash of data that was parsed from the last line. def update_current_request(request_data, &block) # :yields: request if header_line?(request_data) if @current_request case options[:parse_strategy] when 'assume-correct' handle_request(@current_request, &block) @current_request = @file_format.request(request_data) when 'cautious' @skipped_lines += 1 warn(:unclosed_request, "Encountered header line (#{request_data[:line_definition].name.inspect}), but previous request was not closed!") @current_request = nil # remove all data that was parsed, skip next request as well. end elsif footer_line?(request_data) handle_request(@file_format.request(request_data), &block) else @current_request = @file_format.request(request_data) end else if @current_request @current_request << request_data if footer_line?(request_data) handle_request(@current_request, &block) # yield @current_request @current_request = nil end else @skipped_lines += 1 warn(:no_current_request, "Parsebale line (#{request_data[:line_definition].name.inspect}) found outside of a request!") end end end # Handles the parsed request by sending it into the pipeline. # # - It will call RequestLogAnalyzer::Request#validate on the request instance # - It will send the request into the pipeline, checking whether it was accepted by all the filters. # - It will update the parsed_requests and skipped_requests variables accordingly # # request:: The parsed request instance (RequestLogAnalyzer::Request) def handle_request(request, &block) # :yields: :request, request @parsed_requests += 1 request.validate accepted = block_given? ? yield(request) : true @skipped_requests += 1 unless accepted end # Checks whether a given line hash is a header line according to the current file format. # hash:: A hash of data that was parsed from the line. def header_line?(hash) hash[:line_definition].header end # Checks whether a given line hash is a footer line according to the current file format. # hash:: A hash of data that was parsed from the line. def footer_line?(hash) hash[:line_definition].footer end end end