lib/fluent/plugin/out_stats_notifier.rb in fluent-plugin-stats-notifier-0.0.2 vs lib/fluent/plugin/out_stats_notifier.rb in fluent-plugin-stats-notifier-0.0.3
- old
+ new
@@ -13,10 +13,13 @@
config_param :less_equal, :float, :default => nil
config_param :greater_than, :float, :default => nil
config_param :greater_equal, :float, :default => nil
config_param :compare_with, :string, :default => "max"
config_param :tag, :string
+ config_param :add_tag_prefix, :string, :default => nil
+ config_param :remove_tag_prefix, :string, :default => nil
+ config_param :aggregate, :string, :default => 'all'
config_param :store_file, :string, :default => nil
attr_accessor :counts
attr_accessor :matches
attr_accessor :saved_duration
@@ -46,10 +49,34 @@
@compare_with = :avg
else
raise Fluent::ConfigError, "out_stats_notiifer: `compare_with` must be one of `sum`, `max`, `min`, `avg`"
end
+ case @aggregate
+ when 'all'
+ raise Fluent::ConfigError, "anomalydetect: `tag` must be specified with aggregate all" if @tag.nil?
+ when 'tag'
+ raise Fluent::ConfigError, "anomalydetect: `add_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil?
+ else
+ raise Fluent::ConfigError, "anomalydetect: aggregate allows tag/all"
+ end
+
+ @tag_prefix = "#{@add_tag_prefix}." if @add_tag_prefix
+ @tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix
+ @tag_proc =
+ if @tag_prefix and @tag_prefix_match
+ Proc.new {|tag| "#{@tag_prefix}#{lstrip(tag, @tag_prefix_match)}" }
+ elsif @tag_prefix_match
+ Proc.new {|tag| lstrip(tag, @tag_prefix_match) }
+ elsif @tag_prefix
+ Proc.new {|tag| "#{@tag_prefix}#{tag}" }
+ elsif @tag
+ Proc.new {|tag| @tag }
+ else
+ Proc.new {|tag| tag }
+ end
+
@counts = {}
@matches = {}
@mutex = Mutex.new
end
@@ -72,22 +99,22 @@
# stats
count = 0; matches = {}
es.each do |time,record|
if record[key]
- # @todo: make an option for statsuation in the same tag. now only sum is supported
+ # @todo: make an option for calcuation in the same tag. now only sum is supported
matches[key] = (matches[key] ? matches[key] + record[key] : record[key])
end
count += 1
end
# thread safe merge
@counts[tag] ||= 0
@matches[tag] ||= {}
@mutex.synchronize do
if matches[key]
- # @todo: make an option for statsuation in the same tag. now only sum is supported
+ # @todo: make an option for calcuation in the same tag. now only sum is supported
@matches[tag][key] = (@matches[tag][key] ? @matches[tag][key] + matches[key] : matches[key])
end
@counts[tag] += count
end
@@ -117,18 +144,26 @@
end
# This method is the real one to emit
def flush_emit(step)
time = Fluent::Engine.now
- flushed_counts, flushed_matches, @counts, @matches = @counts, @matches, {}, {}
+ counts, matches, @counts, @matches = @counts, @matches, {}, {}
- output = generate_output(flushed_counts, flushed_matches)
- Fluent::Engine.emit(@tag, time, output) if output
+ if @aggregate == 'all'
+ values = matches.values.map {|match| match[@target_key] }.compact
+ output = generate_output(values)
+ Fluent::Engine.emit(@tag, time, output) if output
+ else # aggregate tag
+ matches.each do |tag, match|
+ values = [match[@target_key]]
+ output = generate_output(values)
+ emit_tag = @tag_proc.call(tag)
+ Fluent::Engine.emit(emit_tag, time, output) if output
+ end
+ end
end
- def generate_output(counts, matches)
- values = matches.values.map {|match| match[@target_key] }.compact
-
+ def generate_output(values)
case @compare_with
when :sum
target_value = values.inject(:+)
when :max
target_value = values.max