lib/fluent/plugin/out_stats_notifier.rb in fluent-plugin-stats-notifier-0.0.3 vs lib/fluent/plugin/out_stats_notifier.rb in fluent-plugin-stats-notifier-0.0.4

- old
+ new

@@ -1,9 +1,14 @@ # encoding: UTF-8 class Fluent::StatsNotifierOutput < Fluent::Output Fluent::Plugin.register_output('stats_notifier', self) + # To support log_level option implemented by Fluentd v0.10.43 + unless method_defined?(:log) + define_method("log") { $log } + end + def initialize super require 'pathname' end @@ -11,74 +16,84 @@ config_param :interval, :time, :default => 5 config_param :less_than, :float, :default => nil config_param :less_equal, :float, :default => nil config_param :greater_than, :float, :default => nil config_param :greater_equal, :float, :default => nil - config_param :compare_with, :string, :default => "max" - config_param :tag, :string + config_param :stats, :string, :default => "max" + config_param :compare_with, :string, :default => nil # Obsolete. Use aggregate_stats + config_param :aggregate_stats, :string, :default => "max" # Work only with aggregate :all + config_param :tag, :string, :default => nil config_param :add_tag_prefix, :string, :default => nil config_param :remove_tag_prefix, :string, :default => nil + config_param :add_tag_suffix, :string, :default => nil + config_param :remove_tag_suffix, :string, :default => nil config_param :aggregate, :string, :default => 'all' config_param :store_file, :string, :default => nil attr_accessor :counts - attr_accessor :matches + attr_accessor :queues attr_accessor :saved_duration attr_accessor :saved_at attr_accessor :last_checked def configure(conf) super @interval = @interval.to_i if @less_than and @less_equal - raise Fluent::ConfigError, "out_stats_notiifer: Only either of `less_than` or `less_equal` can be specified." + raise Fluent::ConfigError, "out_stats_notifier: Only either of `less_than` or `less_equal` can be specified." end if @greater_than and @greater_equal - raise Fluent::ConfigError, "out_stats_notiifer: Only either of `greater_than` or `greater_equal` can be specified." + raise Fluent::ConfigError, "out_stats_notifier: Only either of `greater_than` or `greater_equal` can be specified." end - case @compare_with + @aggregate_stats = @compare_with if @compare_with # Support old version compatibility + case @aggregate_stats when "sum" - @compare_with = :sum + @aggregate_stats = :sum when "max" - @compare_with = :max + @aggregate_stats = :max when "min" - @compare_with = :min + @aggregate_stats = :min when "avg" - @compare_with = :avg + @aggregate_stats = :avg else - raise Fluent::ConfigError, "out_stats_notiifer: `compare_with` must be one of `sum`, `max`, `min`, `avg`" + raise Fluent::ConfigError, "out_stats_notifier: `aggregate_stats` must be one of `sum`, `max`, `min`, `avg`" end + case @stats + when "sum" + @stats = :sum + when "max" + @stats = :max + when "min" + @stats = :min + when "avg" + @stats = :avg + else + raise Fluent::ConfigError, "out_stats_notifier: `stats` must be one of `sum`, `max`, `min`, `avg`" + end + + if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? and @add_tag_suffix.nil? and @remove_tag_suffix.nil? + raise Fluent::ConfigError, "out_stats_notifier: No tag option is specified" + end + @tag_proc = tag_proc + case @aggregate when 'all' - raise Fluent::ConfigError, "anomalydetect: `tag` must be specified with aggregate all" if @tag.nil? + raise Fluent::ConfigError, "out_stats_notifier: `tag` must be specified with aggregate all" if @tag.nil? + @aggregate = :all when 'tag' - raise Fluent::ConfigError, "anomalydetect: `add_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil? + # raise Fluent::ConfigError, "out_stats_notifier: `add_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil? + @aggregate = :tag else - raise Fluent::ConfigError, "anomalydetect: aggregate allows tag/all" + raise Fluent::ConfigError, "out_stats_notifier: aggregate allows tag/all" end - @tag_prefix = "#{@add_tag_prefix}." if @add_tag_prefix - @tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix - @tag_proc = - if @tag_prefix and @tag_prefix_match - Proc.new {|tag| "#{@tag_prefix}#{lstrip(tag, @tag_prefix_match)}" } - elsif @tag_prefix_match - Proc.new {|tag| lstrip(tag, @tag_prefix_match) } - elsif @tag_prefix - Proc.new {|tag| "#{@tag_prefix}#{tag}" } - elsif @tag - Proc.new {|tag| @tag } - else - Proc.new {|tag| tag } - end - @counts = {} - @matches = {} + @queues = {} @mutex = Mutex.new end def start super @@ -95,34 +110,34 @@ # Called when new line comes. This method actually does not emit def emit(tag, es, chain) key = @target_key - # stats - count = 0; matches = {} + # enqueus + count = 0; queues = {} es.each do |time,record| if record[key] - # @todo: make an option for calcuation in the same tag. now only sum is supported - matches[key] = (matches[key] ? matches[key] + record[key] : record[key]) + queues[key] ||= [] + queues[key] << record[key] end count += 1 end # thread safe merge @counts[tag] ||= 0 - @matches[tag] ||= {} + @queues[tag] ||= {} @mutex.synchronize do - if matches[key] - # @todo: make an option for calcuation in the same tag. now only sum is supported - @matches[tag][key] = (@matches[tag][key] ? @matches[tag][key] + matches[key] : matches[key]) + if queues[key] + @queues[tag][key] ||= [] + @queues[tag][key].concat(queues[key]) end @counts[tag] += count end chain.next rescue => e - $log.warn "#{e.class} #{e.message} #{e.backtrace.first}" + log.warn "#{e.class} #{e.message} #{e.backtrace.first}" end # thread callback def watcher # instance variable, and public accessable, for test @@ -136,58 +151,88 @@ now = Fluent::Engine.now flush_emit(now - @last_checked) @last_checked = now end rescue => e - $log.warn "#{e.class} #{e.message} #{e.backtrace.first}" + log.warn "#{e.class} #{e.message} #{e.backtrace.first}" end end end # This method is the real one to emit def flush_emit(step) time = Fluent::Engine.now - counts, matches, @counts, @matches = @counts, @matches, {}, {} + counts, queues, @counts, @queues = @counts, @queues, {}, {} - if @aggregate == 'all' - values = matches.values.map {|match| match[@target_key] }.compact - output = generate_output(values) + # Get statistical value among events + evented_queues = {} + queues.each do |tag, queue| + evented_queues[tag] ||= {} + evented_queues[tag][@target_key] = get_stats(queue[@target_key], @stats) if queue[@target_key] + end + + if @aggregate == :all + values = evented_queues.values.map {|queue| queue[@target_key] }.compact + value = get_stats(values, @aggregate_stats) + output = generate_output(value) if value Fluent::Engine.emit(@tag, time, output) if output else # aggregate tag - matches.each do |tag, match| - values = [match[@target_key]] - output = generate_output(values) + evented_queues.each do |tag, queue| + value = queue[@target_key] + output = generate_output(value) if value emit_tag = @tag_proc.call(tag) Fluent::Engine.emit(emit_tag, time, output) if output end end end - def generate_output(values) - case @compare_with + def get_stats(values, method = :max) + case method when :sum - target_value = values.inject(:+) + stats = values.inject(:+) when :max - target_value = values.max + stats = values.max when :min - target_value = values.min + stats = values.min when :avg - target_value = values.inject(:+) / values.count unless values.empty? + stats = values.inject(:+) / values.count unless values.empty? end + end - return nil if target_value.nil? - return nil if target_value == 0 # ignore 0 because standby nodes receive 0 message usually - return nil if @less_than and @less_than <= target_value - return nil if @less_equal and @less_equal < target_value - return nil if @greater_than and target_value <= @greater_than - return nil if @greater_equal and target_value < @greater_equal + def generate_output(value) + return nil if value == 0 # ignore 0 because standby nodes receive 0 message usually + return nil if @less_than and @less_than <= value + return nil if @less_equal and @less_equal < value + return nil if @greater_than and value <= @greater_than + return nil if @greater_equal and value < @greater_equal output = {} - output[@target_key] = target_value + output[@target_key] = value output end + def tag_proc + rstrip = Proc.new {|str, substr| str.chomp(substr) } + lstrip = Proc.new {|str, substr| str.start_with?(substr) ? str[substr.size..-1] : str } + tag_prefix = "#{rstrip.call(@add_tag_prefix, '.')}." if @add_tag_prefix + tag_suffix = ".#{lstrip.call(@add_tag_suffix, '.')}" if @add_tag_suffix + tag_prefix_match = "#{rstrip.call(@remove_tag_prefix, '.')}." if @remove_tag_prefix + tag_suffix_match = ".#{lstrip.call(@remove_tag_suffix, '.')}" if @remove_tag_suffix + tag_fixed = @tag if @tag + if tag_fixed + Proc.new {|tag| tag_fixed } + elsif tag_prefix_match and tag_suffix_match + Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag, tag_prefix_match), tag_suffix_match)}#{tag_suffix}" } + elsif tag_prefix_match + Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag, tag_prefix_match)}#{tag_suffix}" } + elsif tag_suffix_match + Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag, tag_suffix_match)}#{tag_suffix}" } + else + Proc.new {|tag| "#{tag_prefix}#{tag}#{tag_suffix}" } + end + end + # Store internal status into a file # # @param [String] file_path def save_status(file_path) return unless file_path @@ -196,18 +241,18 @@ Pathname.new(file_path).open('wb') do |f| @saved_at = Fluent::Engine.now @saved_duration = @saved_at - @last_checked Marshal.dump({ :counts => @counts, - :matches => @matches, + :queues => @queues, :saved_at => @saved_at, :saved_duration => @saved_duration, :target_key => @target_key, }, f) end rescue => e - $log.warn "out_stats_notifier: Can't write store_file #{e.class} #{e.message}" + log.warn "out_stats_notifier: Can't write store_file #{e.class} #{e.message}" end end # Load internal status from a file # @@ -218,27 +263,30 @@ begin f.open('rb') do |f| stored = Marshal.load(f) if stored[:target_key] == @target_key + if stored[:queues] + if Fluent::Engine.now <= stored[:saved_at] + interval + @counts = stored[:counts] + @queues = stored[:queues] + @saved_at = stored[:saved_at] + @saved_duration = stored[:saved_duration] - if Fluent::Engine.now <= stored[:saved_at] + interval - @counts = stored[:counts] - @matches = stored[:matches] - @saved_at = stored[:saved_at] - @saved_duration = stored[:saved_duration] - - # skip the saved duration to continue counting - @last_checked = Fluent::Engine.now - @saved_duration + # skip the saved duration to continue counting + @last_checked = Fluent::Engine.now - @saved_duration + else + log.warn "out_stats_notifier: stored data is outdated. ignore stored data" + end else - $log.warn "out_stats_notifier: stored data is outdated. ignore stored data" + log.warn "out_stats_notifier: stored data is incompatible. ignore stored data" end else - $log.warn "out_stats_notiifer: configuration param was changed. ignore stored data" + log.warn "out_stats_notifier: configuration param was changed. ignore stored data" end end rescue => e - $log.warn "out_stats_notifier: Can't load store_file #{e.class} #{e.message}" + log.warn "out_stats_notifier: Can't load store_file #{e.class} #{e.message}" end end end