lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.4.0 vs lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.4.1
- old
+ new
@@ -18,10 +18,11 @@
config_param :greater_than, :float, :default => nil
config_param :greater_equal, :float, :default => nil
config_param :output_tag, :string, :default => nil # obsolete
config_param :tag, :string, :default => nil
config_param :add_tag_prefix, :string, :default => 'count'
+ config_param :remove_tag_prefix, :string, :default => nil
config_param :output_with_joined_delimiter, :string, :default => nil # obsolete
config_param :delimiter, :string, :default => nil
config_param :aggregate, :string, :default => 'tag'
config_param :replace_invalid_sequence, :bool, :default => false
config_param :store_file, :string, :default => nil
@@ -80,10 +81,25 @@
if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?)
raise Fluent::ConfigError, "#{@store_file} is not writable"
end
end
+ @tag_prefix = "#{@add_tag_prefix}."
+ @tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix
+ @tag_proc =
+ if @tag
+ Proc.new {|tag| @tag }
+ elsif @tag_prefix and @tag_prefix_match
+ Proc.new {|tag| "#{@tag_prefix}#{lstrip(tag, @tag_prefix_match)}" }
+ elsif @tag_prefix_match
+ Proc.new {|tag| lstrip(tag, @tag_prefix_match) }
+ elsif @tag_prefix
+ Proc.new {|tag| "#{@tag_prefix}#{tag}" }
+ else
+ Proc.new {|tag| tag }
+ end
+
@matches = {}
@counts = {}
@mutex = Mutex.new
end
@@ -157,12 +173,14 @@
else
flushed_counts.keys.each do |tag|
count = flushed_counts[tag]
matches = flushed_matches[tag]
output = generate_output(count, matches, tag)
- tag = @tag ? @tag : "#{@add_tag_prefix}.#{tag}"
- Fluent::Engine.emit(tag, time, output) if output
+ if output
+ emit_tag = @tag_proc.call(tag)
+ Fluent::Engine.emit(emit_tag, time, output)
+ end
end
end
end
def generate_output(count, matches, tag = nil)
@@ -180,20 +198,21 @@
output['input_tag_last'] = tag.split('.').last
end
output
end
+ def lstrip(string, substring)
+ string.index(substring) == 0 ? string[substring.size..-1] : string
+ end
+
def match(string)
begin
return false if @regexp and !@regexp.match(string)
return false if @exclude and @exclude.match(string)
rescue ArgumentError => e
- unless e.message.index("invalid byte sequence in") == 0
- raise
- end
+ raise e unless e.message.index("invalid byte sequence in") == 0
string = replace_invalid_byte(string)
- return false if @regexp and !@regexp.match(string)
- return false if @exclude and @exclude.match(string)
+ retry
end
return true
end
def replace_invalid_byte(string)