lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.5.4 vs lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.5.5
- old
+ new
@@ -30,10 +30,11 @@
config_param :tag, :string, :default => nil
config_param :add_tag_prefix, :string, :default => nil
config_param :remove_tag_prefix, :string, :default => nil
config_param :add_tag_suffix, :string, :default => nil
config_param :remove_tag_suffix, :string, :default => nil
+ config_param :remove_tag_slice, :string, :default => nil
config_param :output_with_joined_delimiter, :string, :default => nil # obsolete
config_param :delimiter, :string, :default => nil
config_param :aggregate, :string, :default => 'tag'
config_param :replace_invalid_sequence, :bool, :default => false
config_param :store_file, :string, :default => nil
@@ -92,23 +93,26 @@
else
@less_equal = @threshold
end
end
- if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? and @add_tag_suffix.nil? and @remove_tag_suffix.nil?
+ if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? and @add_tag_suffix.nil? and @remove_tag_suffix.nil? and @remove_tag_slice.nil?
@add_tag_prefix = 'count' # not ConfigError to support lower version compatibility
end
@tag_proc = tag_proc
case @aggregate
when 'all'
raise Fluent::ConfigError, "grepcounter: `tag` must be specified with aggregate all" if @tag.nil?
- when 'tag'
- # raise Fluent::ConfigError, "grepcounter: add_tag_prefix must be specified with aggregate tag" if @add_tag_prefix.nil?
+ when 'tag' # obsolete
+ @aggregate = 'in_tag'
+ when 'in_tag'
+ when 'out_tag'
else
- raise Fluent::ConfigError, "grepcounter: aggregate allows tag/all"
+ raise Fluent::ConfigError, "grepcounter: aggregate allows all/in_tag/out_tag"
end
+ @aggregate_proc = aggregate_proc(@tag_proc)
if @store_file
f = Pathname.new(@store_file)
if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?)
raise Fluent::ConfigError, "#{@store_file} is not writable"
@@ -154,16 +158,18 @@
matches << record # new style stores as an array of hashes, but how to utilize it?
end
count += 1
end
end
+
+ aggregate_key = @aggregate_proc.call(tag)
# thread safe merge
- @counts[tag] ||= 0
- @matches[tag] ||= []
+ @counts[aggregate_key] ||= 0
+ @matches[aggregate_key] ||= []
@mutex.synchronize do
- @counts[tag] += count
- @matches[tag] += matches
+ @counts[aggregate_key] += count
+ @matches[aggregate_key] += matches
end
chain.next
rescue => e
log.warn "grepcounter: #{e.class} #{e.message} #{e.backtrace.first}"
@@ -190,26 +196,33 @@
# This method is the real one to emit
def flush_emit(step)
time = Fluent::Engine.now
flushed_counts, flushed_matches, @counts, @matches = @counts, @matches, {}, {}
- if @aggregate == 'all'
- count = 0; matches = []
- flushed_counts.keys.each do |tag|
- count += flushed_counts[tag]
- matches += flushed_matches[tag]
- end
+ case @aggregate
+ when 'all'
+ count = flushed_counts[:all]
+ matches = flushed_matches[:all]
output = generate_output(count, matches)
Fluent::Engine.emit(@tag, time, output) if output
- else
+ when 'out_tag'
+ flushed_counts.keys.each do |out_tag|
+ count = flushed_counts[out_tag]
+ matches = flushed_matches[out_tag]
+ output = generate_output(count, matches)
+ if output
+ Fluent::Engine.emit(out_tag, time, output)
+ end
+ end
+ else # in_tag
flushed_counts.keys.each do |tag|
count = flushed_counts[tag]
matches = flushed_matches[tag]
output = generate_output(count, matches, tag)
if output
- emit_tag = @tag_proc.call(tag)
- Fluent::Engine.emit(emit_tag, time, output)
+ out_tag = @tag_proc.call(tag)
+ Fluent::Engine.emit(out_tag, time, output)
end
end
end
end
@@ -232,27 +245,50 @@
output['input_tag_last'] = tag.split('.').last
end
output
end
+ def aggregate_proc(tag_proc)
+ case @aggregate
+ when 'all'
+ Proc.new {|tag| :all }
+ when 'in_tag'
+ Proc.new {|tag| tag }
+ when 'out_tag'
+ Proc.new {|tag| tag_proc.call(tag) }
+ end
+ end
+
def tag_proc
+ tag_slice_proc =
+ if @remove_tag_slice
+ lindex, rindex = @remove_tag_slice.split('..', 2)
+ if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/
+ raise Fluent::ConfigError, "out_grepcounter: remove_tag_slice must be formatted like [num]..[num]"
+ end
+ l, r = lindex.to_i, rindex.to_i
+ Proc.new {|tag| (tags = tag.split('.')[l..r]).nil? ? "" : tags.join('.') }
+ else
+ Proc.new {|tag| tag }
+ end
+
rstrip = Proc.new {|str, substr| str.chomp(substr) }
lstrip = Proc.new {|str, substr| str.start_with?(substr) ? str[substr.size..-1] : str }
tag_prefix = "#{rstrip.call(@add_tag_prefix, '.')}." if @add_tag_prefix
tag_suffix = ".#{lstrip.call(@add_tag_suffix, '.')}" if @add_tag_suffix
tag_prefix_match = "#{rstrip.call(@remove_tag_prefix, '.')}." if @remove_tag_prefix
tag_suffix_match = ".#{lstrip.call(@remove_tag_suffix, '.')}" if @remove_tag_suffix
tag_fixed = @tag if @tag
- if tag_fixed
- Proc.new {|tag| tag_fixed }
- elsif tag_prefix_match and tag_suffix_match
- Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag, tag_prefix_match), tag_suffix_match)}#{tag_suffix}" }
+ if tag_prefix_match and tag_suffix_match
+ Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag_slice_proc.call(tag), tag_prefix_match), tag_suffix_match)}#{tag_suffix}" }
elsif tag_prefix_match
- Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag, tag_prefix_match)}#{tag_suffix}" }
+ Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag_slice_proc.call(tag), tag_prefix_match)}#{tag_suffix}" }
elsif tag_suffix_match
- Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag, tag_suffix_match)}#{tag_suffix}" }
+ Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag_slice_proc.call(tag), tag_suffix_match)}#{tag_suffix}" }
+ elsif tag_prefix || @remove_tag_slice || tag_suffix
+ Proc.new {|tag| "#{tag_prefix}#{tag_slice_proc.call(tag)}#{tag_suffix}" }
else
- Proc.new {|tag| "#{tag_prefix}#{tag}#{tag_suffix}" }
+ Proc.new {|tag| tag_fixed }
end
end
def match(regexp, string)
begin