lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.4.2 vs lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.5.0

- old
+ new

@@ -1,19 +1,23 @@ # encoding: UTF-8 class Fluent::GrepCounterOutput < Fluent::Output Fluent::Plugin.register_output('grepcounter', self) + REGEXP_MAX_NUM = 20 + def initialize super require 'pathname' end - config_param :input_key, :string + config_param :input_key, :string, :default => nil config_param :regexp, :string, :default => nil - config_param :count_interval, :time, :default => 5 config_param :exclude, :string, :default => nil - config_param :threshold, :integer, :default => nil # obsolete + (1..REGEXP_MAX_NUM).each {|i| config_param :"regexp#{i}", :string, :default => nil } + (1..REGEXP_MAX_NUM).each {|i| config_param :"exclude#{i}", :string, :default => nil } + config_param :count_interval, :time, :default => 5 + config_param :threshold, :integer, :default => nil # not obsolete, though config_param :comparator, :string, :default => '>=' # obsolete config_param :less_than, :float, :default => nil config_param :less_equal, :float, :default => nil config_param :greater_than, :float, :default => nil config_param :greater_equal, :float, :default => nil @@ -34,61 +38,60 @@ attr_accessor :last_checked def configure(conf) super - @count_interval = @count_interval.to_i - @input_key = @input_key.to_s - @regexp = Regexp.compile(@regexp) if @regexp - @exclude = Regexp.compile(@exclude) if @exclude + if @input_key + @regexp = Regexp.compile(@regexp) if @regexp + @exclude = Regexp.compile(@exclude) if @exclude + end - @threshold = @threshold.to_i if @threshold + @regexps = {} + (1..REGEXP_MAX_NUM).each do |i| + next unless conf["regexp#{i}"] + key, regexp = conf["regexp#{i}"].split(/ /, 2) + raise Fluent::ConfigError, "regexp#{i} does not contain 2 parameters" unless regexp + raise Fluent::ConfigError, "regexp#{i} contains a duplicated key, #{key}" if @regexps[key] + @regexps[key] = Regexp.compile(regexp) + end - unless ['>=', '<='].include?(@comparator) - raise Fluent::ConfigError, "grepcounter: comparator allows >=, <=" + @excludes = {} + (1..REGEXP_MAX_NUM).each do |i| + next unless conf["exclude#{i}"] + key, exclude = conf["exclude#{i}"].split(/ /, 2) + raise Fluent::ConfigError, "exclude#{i} does not contain 2 parameters" unless exclude + raise Fluent::ConfigError, "exclude#{i} contains a duplicated key, #{key}" if @excludes[key] + @excludes[key] = Regexp.compile(exclude) end + if @input_key and (!@regexps.empty? or !@excludes.empty?) + raise Fluent::ConfigError, "Classic style `input_key`, and new style `regexpN`, `excludeN` can not be used together" + end + + # to support obsolete options + @tag ||= @output_tag + @delimiter ||= @output_with_joined_delimiter + # to support obsolete `threshold` and `comparator` options if @threshold.nil? and @less_than.nil? and @less_equal.nil? and @greater_than.nil? and @greater_equal.nil? @threshold = 1 end - if @threshold and @comparator - if @comparator == '>=' + unless %w[>= <=].include?(@comparator) + raise Fluent::ConfigError, "grepcounter: comparator allows >=, <=" + end + if @threshold + case @comparator + when '>=' @greater_equal = @threshold else @less_equal = @threshold end end - # to support osolete `output_tag` option - @tag = @output_tag if !@tag and @output_tag - - # to support obsolete `output_with_joined_delimiter` option - @delimiter = @output_with_joined_delimiter if !@delimiter and @output_with_joined_delimiter - - unless ['tag', 'all'].include?(@aggregate) - raise Fluent::ConfigError, "grepcounter: aggregate allows tag/all" - end - - case @aggregate - when 'all' - raise Fluent::ConfigError, "grepcounter: output_tag must be specified with aggregate all" if @output_tag.nil? - when 'tag' - # raise Fluent::ConfigError, "grepcounter: add_tag_prefix must be specified with aggregate tag" if @add_tag_prefix.nil? - end - - if @store_file - f = Pathname.new(@store_file) - if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?) - raise Fluent::ConfigError, "#{@store_file} is not writable" - end - end - if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? @add_tag_prefix = 'count' # not ConfigError to support lower version compatibility end - @tag_prefix = "#{@add_tag_prefix}." if @add_tag_prefix @tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix @tag_proc = if @tag Proc.new {|tag| @tag } @@ -100,10 +103,26 @@ Proc.new {|tag| "#{@tag_prefix}#{tag}" } else Proc.new {|tag| tag } end + case @aggregate + when 'all' + raise Fluent::ConfigError, "grepcounter: `tag` must be specified with aggregate all" if @tag.nil? + when 'tag' + # raise Fluent::ConfigError, "grepcounter: add_tag_prefix must be specified with aggregate tag" if @add_tag_prefix.nil? + else + raise Fluent::ConfigError, "grepcounter: aggregate allows tag/all" + end + + if @store_file + f = Pathname.new(@store_file) + if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?) + raise Fluent::ConfigError, "#{@store_file} is not writable" + end + end + @matches = {} @counts = {} @mutex = Mutex.new end @@ -123,14 +142,27 @@ # Called when new line comes. This method actually does not emit def emit(tag, es, chain) count = 0; matches = [] # filter out and insert es.each do |time,record| - value = record[@input_key] - next unless match(value.to_s) - matches << value - count += 1 + catch(:break_loop) do + if key = @input_key + value = record[key].to_s + throw :break_loop if @regexp and !match(@regexp, value) + throw :break_loop if @exclude and match(@exclude, value) + matches << value # old style stores as an array of values + else + @regexps.each do |key, regexp| + throw :break_loop unless match(regexp, record[key].to_s) + end + @excludes.each do |key, exclude| + throw :break_loop if match(exclude, record[key].to_s) + end + matches << record # new style stores as an array of hashes, but how to utilize it? + end + count += 1 + end end # thread safe merge @counts[tag] ||= 0 @matches[tag] ||= [] @mutex.synchronize do @@ -194,11 +226,16 @@ return nil if @less_equal and @less_equal < count return nil if @greater_than and count <= @greater_than return nil if @greater_equal and count < @greater_equal output = {} output['count'] = count - output['message'] = @delimiter.nil? ? matches : matches.join(@delimiter) + if @input_key + output['message'] = @delimiter ? matches.join(@delimiter) : matches + else + # I will think of good format later... + output['message'] = @delimiter ? matches.map{|hash| hash.to_hash}.join(@delimiter) : matches + end if tag output['input_tag'] = tag output['input_tag_last'] = tag.split('.').last end output @@ -206,13 +243,12 @@ def lstrip(string, substring) string.index(substring) == 0 ? string[substring.size..-1] : string end - def match(string) + def match(regexp, string) begin - return false if @regexp and !@regexp.match(string) - return false if @exclude and @exclude.match(string) + return regexp.match(string) rescue ArgumentError => e raise e unless e.message.index("invalid byte sequence in") == 0 string = replace_invalid_byte(string) retry end