lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.5.4 vs lib/fluent/plugin/out_grepcounter.rb in fluent-plugin-grepcounter-0.5.5

- old
+ new

@@ -30,10 +30,11 @@ config_param :tag, :string, :default => nil config_param :add_tag_prefix, :string, :default => nil config_param :remove_tag_prefix, :string, :default => nil config_param :add_tag_suffix, :string, :default => nil config_param :remove_tag_suffix, :string, :default => nil + config_param :remove_tag_slice, :string, :default => nil config_param :output_with_joined_delimiter, :string, :default => nil # obsolete config_param :delimiter, :string, :default => nil config_param :aggregate, :string, :default => 'tag' config_param :replace_invalid_sequence, :bool, :default => false config_param :store_file, :string, :default => nil @@ -92,23 +93,26 @@ else @less_equal = @threshold end end - if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? and @add_tag_suffix.nil? and @remove_tag_suffix.nil? + if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? and @add_tag_suffix.nil? and @remove_tag_suffix.nil? and @remove_tag_slice.nil? @add_tag_prefix = 'count' # not ConfigError to support lower version compatibility end @tag_proc = tag_proc case @aggregate when 'all' raise Fluent::ConfigError, "grepcounter: `tag` must be specified with aggregate all" if @tag.nil? - when 'tag' - # raise Fluent::ConfigError, "grepcounter: add_tag_prefix must be specified with aggregate tag" if @add_tag_prefix.nil? + when 'tag' # obsolete + @aggregate = 'in_tag' + when 'in_tag' + when 'out_tag' else - raise Fluent::ConfigError, "grepcounter: aggregate allows tag/all" + raise Fluent::ConfigError, "grepcounter: aggregate allows all/in_tag/out_tag" end + @aggregate_proc = aggregate_proc(@tag_proc) if @store_file f = Pathname.new(@store_file) if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?) raise Fluent::ConfigError, "#{@store_file} is not writable" @@ -154,16 +158,18 @@ matches << record # new style stores as an array of hashes, but how to utilize it? end count += 1 end end + + aggregate_key = @aggregate_proc.call(tag) # thread safe merge - @counts[tag] ||= 0 - @matches[tag] ||= [] + @counts[aggregate_key] ||= 0 + @matches[aggregate_key] ||= [] @mutex.synchronize do - @counts[tag] += count - @matches[tag] += matches + @counts[aggregate_key] += count + @matches[aggregate_key] += matches end chain.next rescue => e log.warn "grepcounter: #{e.class} #{e.message} #{e.backtrace.first}" @@ -190,26 +196,33 @@ # This method is the real one to emit def flush_emit(step) time = Fluent::Engine.now flushed_counts, flushed_matches, @counts, @matches = @counts, @matches, {}, {} - if @aggregate == 'all' - count = 0; matches = [] - flushed_counts.keys.each do |tag| - count += flushed_counts[tag] - matches += flushed_matches[tag] - end + case @aggregate + when 'all' + count = flushed_counts[:all] + matches = flushed_matches[:all] output = generate_output(count, matches) Fluent::Engine.emit(@tag, time, output) if output - else + when 'out_tag' + flushed_counts.keys.each do |out_tag| + count = flushed_counts[out_tag] + matches = flushed_matches[out_tag] + output = generate_output(count, matches) + if output + Fluent::Engine.emit(out_tag, time, output) + end + end + else # in_tag flushed_counts.keys.each do |tag| count = flushed_counts[tag] matches = flushed_matches[tag] output = generate_output(count, matches, tag) if output - emit_tag = @tag_proc.call(tag) - Fluent::Engine.emit(emit_tag, time, output) + out_tag = @tag_proc.call(tag) + Fluent::Engine.emit(out_tag, time, output) end end end end @@ -232,27 +245,50 @@ output['input_tag_last'] = tag.split('.').last end output end + def aggregate_proc(tag_proc) + case @aggregate + when 'all' + Proc.new {|tag| :all } + when 'in_tag' + Proc.new {|tag| tag } + when 'out_tag' + Proc.new {|tag| tag_proc.call(tag) } + end + end + def tag_proc + tag_slice_proc = + if @remove_tag_slice + lindex, rindex = @remove_tag_slice.split('..', 2) + if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/ + raise Fluent::ConfigError, "out_grepcounter: remove_tag_slice must be formatted like [num]..[num]" + end + l, r = lindex.to_i, rindex.to_i + Proc.new {|tag| (tags = tag.split('.')[l..r]).nil? ? "" : tags.join('.') } + else + Proc.new {|tag| tag } + end + rstrip = Proc.new {|str, substr| str.chomp(substr) } lstrip = Proc.new {|str, substr| str.start_with?(substr) ? str[substr.size..-1] : str } tag_prefix = "#{rstrip.call(@add_tag_prefix, '.')}." if @add_tag_prefix tag_suffix = ".#{lstrip.call(@add_tag_suffix, '.')}" if @add_tag_suffix tag_prefix_match = "#{rstrip.call(@remove_tag_prefix, '.')}." if @remove_tag_prefix tag_suffix_match = ".#{lstrip.call(@remove_tag_suffix, '.')}" if @remove_tag_suffix tag_fixed = @tag if @tag - if tag_fixed - Proc.new {|tag| tag_fixed } - elsif tag_prefix_match and tag_suffix_match - Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag, tag_prefix_match), tag_suffix_match)}#{tag_suffix}" } + if tag_prefix_match and tag_suffix_match + Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag_slice_proc.call(tag), tag_prefix_match), tag_suffix_match)}#{tag_suffix}" } elsif tag_prefix_match - Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag, tag_prefix_match)}#{tag_suffix}" } + Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag_slice_proc.call(tag), tag_prefix_match)}#{tag_suffix}" } elsif tag_suffix_match - Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag, tag_suffix_match)}#{tag_suffix}" } + Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag_slice_proc.call(tag), tag_suffix_match)}#{tag_suffix}" } + elsif tag_prefix || @remove_tag_slice || tag_suffix + Proc.new {|tag| "#{tag_prefix}#{tag_slice_proc.call(tag)}#{tag_suffix}" } else - Proc.new {|tag| "#{tag_prefix}#{tag}#{tag_suffix}" } + Proc.new {|tag| tag_fixed } end end def match(regexp, string) begin