lib/fluent/plugin/out_stats_notifier.rb in fluent-plugin-stats-notifier-0.0.2 vs lib/fluent/plugin/out_stats_notifier.rb in fluent-plugin-stats-notifier-0.0.3

- old
+ new

@@ -13,10 +13,13 @@ config_param :less_equal, :float, :default => nil config_param :greater_than, :float, :default => nil config_param :greater_equal, :float, :default => nil config_param :compare_with, :string, :default => "max" config_param :tag, :string + config_param :add_tag_prefix, :string, :default => nil + config_param :remove_tag_prefix, :string, :default => nil + config_param :aggregate, :string, :default => 'all' config_param :store_file, :string, :default => nil attr_accessor :counts attr_accessor :matches attr_accessor :saved_duration @@ -46,10 +49,34 @@ @compare_with = :avg else raise Fluent::ConfigError, "out_stats_notiifer: `compare_with` must be one of `sum`, `max`, `min`, `avg`" end + case @aggregate + when 'all' + raise Fluent::ConfigError, "anomalydetect: `tag` must be specified with aggregate all" if @tag.nil? + when 'tag' + raise Fluent::ConfigError, "anomalydetect: `add_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil? + else + raise Fluent::ConfigError, "anomalydetect: aggregate allows tag/all" + end + + @tag_prefix = "#{@add_tag_prefix}." if @add_tag_prefix + @tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix + @tag_proc = + if @tag_prefix and @tag_prefix_match + Proc.new {|tag| "#{@tag_prefix}#{lstrip(tag, @tag_prefix_match)}" } + elsif @tag_prefix_match + Proc.new {|tag| lstrip(tag, @tag_prefix_match) } + elsif @tag_prefix + Proc.new {|tag| "#{@tag_prefix}#{tag}" } + elsif @tag + Proc.new {|tag| @tag } + else + Proc.new {|tag| tag } + end + @counts = {} @matches = {} @mutex = Mutex.new end @@ -72,22 +99,22 @@ # stats count = 0; matches = {} es.each do |time,record| if record[key] - # @todo: make an option for statsuation in the same tag. now only sum is supported + # @todo: make an option for calcuation in the same tag. now only sum is supported matches[key] = (matches[key] ? matches[key] + record[key] : record[key]) end count += 1 end # thread safe merge @counts[tag] ||= 0 @matches[tag] ||= {} @mutex.synchronize do if matches[key] - # @todo: make an option for statsuation in the same tag. now only sum is supported + # @todo: make an option for calcuation in the same tag. now only sum is supported @matches[tag][key] = (@matches[tag][key] ? @matches[tag][key] + matches[key] : matches[key]) end @counts[tag] += count end @@ -117,18 +144,26 @@ end # This method is the real one to emit def flush_emit(step) time = Fluent::Engine.now - flushed_counts, flushed_matches, @counts, @matches = @counts, @matches, {}, {} + counts, matches, @counts, @matches = @counts, @matches, {}, {} - output = generate_output(flushed_counts, flushed_matches) - Fluent::Engine.emit(@tag, time, output) if output + if @aggregate == 'all' + values = matches.values.map {|match| match[@target_key] }.compact + output = generate_output(values) + Fluent::Engine.emit(@tag, time, output) if output + else # aggregate tag + matches.each do |tag, match| + values = [match[@target_key]] + output = generate_output(values) + emit_tag = @tag_proc.call(tag) + Fluent::Engine.emit(emit_tag, time, output) if output + end + end end - def generate_output(counts, matches) - values = matches.values.map {|match| match[@target_key] }.compact - + def generate_output(values) case @compare_with when :sum target_value = values.inject(:+) when :max target_value = values.max