lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.4.2 vs lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.5.0
- old
+ new
@@ -1,19 +1,23 @@
# encoding: UTF-8
class Fluent::GrepCounterOutput < Fluent::Output
Fluent::Plugin.register_output('grepcounter', self)
+ REGEXP_MAX_NUM = 20
+
def initialize
super
require 'pathname'
end
- config_param :input_key, :string
+ config_param :input_key, :string, :default => nil
config_param :regexp, :string, :default => nil
- config_param :count_interval, :time, :default => 5
config_param :exclude, :string, :default => nil
- config_param :threshold, :integer, :default => nil # obsolete
+ (1..REGEXP_MAX_NUM).each {|i| config_param :"regexp#{i}", :string, :default => nil }
+ (1..REGEXP_MAX_NUM).each {|i| config_param :"exclude#{i}", :string, :default => nil }
+ config_param :count_interval, :time, :default => 5
+ config_param :threshold, :integer, :default => nil # not obsolete, though
config_param :comparator, :string, :default => '>=' # obsolete
config_param :less_than, :float, :default => nil
config_param :less_equal, :float, :default => nil
config_param :greater_than, :float, :default => nil
config_param :greater_equal, :float, :default => nil
@@ -34,61 +38,60 @@
attr_accessor :last_checked
def configure(conf)
super
- @count_interval = @count_interval.to_i
- @input_key = @input_key.to_s
- @regexp = Regexp.compile(@regexp) if @regexp
- @exclude = Regexp.compile(@exclude) if @exclude
+ if @input_key
+ @regexp = Regexp.compile(@regexp) if @regexp
+ @exclude = Regexp.compile(@exclude) if @exclude
+ end
- @threshold = @threshold.to_i if @threshold
+ @regexps = {}
+ (1..REGEXP_MAX_NUM).each do |i|
+ next unless conf["regexp#{i}"]
+ key, regexp = conf["regexp#{i}"].split(/ /, 2)
+ raise Fluent::ConfigError, "regexp#{i} does not contain 2 parameters" unless regexp
+ raise Fluent::ConfigError, "regexp#{i} contains a duplicated key, #{key}" if @regexps[key]
+ @regexps[key] = Regexp.compile(regexp)
+ end
- unless ['>=', '<='].include?(@comparator)
- raise Fluent::ConfigError, "grepcounter: comparator allows >=, <="
+ @excludes = {}
+ (1..REGEXP_MAX_NUM).each do |i|
+ next unless conf["exclude#{i}"]
+ key, exclude = conf["exclude#{i}"].split(/ /, 2)
+ raise Fluent::ConfigError, "exclude#{i} does not contain 2 parameters" unless exclude
+ raise Fluent::ConfigError, "exclude#{i} contains a duplicated key, #{key}" if @excludes[key]
+ @excludes[key] = Regexp.compile(exclude)
end
+ if @input_key and (!@regexps.empty? or !@excludes.empty?)
+ raise Fluent::ConfigError, "Classic style `input_key`, and new style `regexpN`, `excludeN` can not be used together"
+ end
+
+ # to support obsolete options
+ @tag ||= @output_tag
+ @delimiter ||= @output_with_joined_delimiter
+
# to support obsolete `threshold` and `comparator` options
if @threshold.nil? and @less_than.nil? and @less_equal.nil? and @greater_than.nil? and @greater_equal.nil?
@threshold = 1
end
- if @threshold and @comparator
- if @comparator == '>='
+ unless %w[>= <=].include?(@comparator)
+ raise Fluent::ConfigError, "grepcounter: comparator allows >=, <="
+ end
+ if @threshold
+ case @comparator
+ when '>='
@greater_equal = @threshold
else
@less_equal = @threshold
end
end
- # to support osolete `output_tag` option
- @tag = @output_tag if !@tag and @output_tag
-
- # to support obsolete `output_with_joined_delimiter` option
- @delimiter = @output_with_joined_delimiter if !@delimiter and @output_with_joined_delimiter
-
- unless ['tag', 'all'].include?(@aggregate)
- raise Fluent::ConfigError, "grepcounter: aggregate allows tag/all"
- end
-
- case @aggregate
- when 'all'
- raise Fluent::ConfigError, "grepcounter: output_tag must be specified with aggregate all" if @output_tag.nil?
- when 'tag'
- # raise Fluent::ConfigError, "grepcounter: add_tag_prefix must be specified with aggregate tag" if @add_tag_prefix.nil?
- end
-
- if @store_file
- f = Pathname.new(@store_file)
- if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?)
- raise Fluent::ConfigError, "#{@store_file} is not writable"
- end
- end
-
if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil?
@add_tag_prefix = 'count' # not ConfigError to support lower version compatibility
end
-
@tag_prefix = "#{@add_tag_prefix}." if @add_tag_prefix
@tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix
@tag_proc =
if @tag
Proc.new {|tag| @tag }
@@ -100,10 +103,26 @@
Proc.new {|tag| "#{@tag_prefix}#{tag}" }
else
Proc.new {|tag| tag }
end
+ case @aggregate
+ when 'all'
+ raise Fluent::ConfigError, "grepcounter: `tag` must be specified with aggregate all" if @tag.nil?
+ when 'tag'
+ # raise Fluent::ConfigError, "grepcounter: add_tag_prefix must be specified with aggregate tag" if @add_tag_prefix.nil?
+ else
+ raise Fluent::ConfigError, "grepcounter: aggregate allows tag/all"
+ end
+
+ if @store_file
+ f = Pathname.new(@store_file)
+ if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?)
+ raise Fluent::ConfigError, "#{@store_file} is not writable"
+ end
+ end
+
@matches = {}
@counts = {}
@mutex = Mutex.new
end
@@ -123,14 +142,27 @@
# Called when new line comes. This method actually does not emit
def emit(tag, es, chain)
count = 0; matches = []
# filter out and insert
es.each do |time,record|
- value = record[@input_key]
- next unless match(value.to_s)
- matches << value
- count += 1
+ catch(:break_loop) do
+ if key = @input_key
+ value = record[key].to_s
+ throw :break_loop if @regexp and !match(@regexp, value)
+ throw :break_loop if @exclude and match(@exclude, value)
+ matches << value # old style stores as an array of values
+ else
+ @regexps.each do |key, regexp|
+ throw :break_loop unless match(regexp, record[key].to_s)
+ end
+ @excludes.each do |key, exclude|
+ throw :break_loop if match(exclude, record[key].to_s)
+ end
+ matches << record # new style stores as an array of hashes, but how to utilize it?
+ end
+ count += 1
+ end
end
# thread safe merge
@counts[tag] ||= 0
@matches[tag] ||= []
@mutex.synchronize do
@@ -194,11 +226,16 @@
return nil if @less_equal and @less_equal < count
return nil if @greater_than and count <= @greater_than
return nil if @greater_equal and count < @greater_equal
output = {}
output['count'] = count
- output['message'] = @delimiter.nil? ? matches : matches.join(@delimiter)
+ if @input_key
+ output['message'] = @delimiter ? matches.join(@delimiter) : matches
+ else
+ # I will think of good format later...
+ output['message'] = @delimiter ? matches.map{|hash| hash.to_hash}.join(@delimiter) : matches
+ end
if tag
output['input_tag'] = tag
output['input_tag_last'] = tag.split('.').last
end
output
@@ -206,13 +243,12 @@
def lstrip(string, substring)
string.index(substring) == 0 ? string[substring.size..-1] : string
end
- def match(string)
+ def match(regexp, string)
begin
- return false if @regexp and !@regexp.match(string)
- return false if @exclude and @exclude.match(string)
+ return regexp.match(string)
rescue ArgumentError => e
raise e unless e.message.index("invalid byte sequence in") == 0
string = replace_invalid_byte(string)
retry
end