lib/fluent/plugin/out_stats_notifier.rb in fluent-plugin-stats-notifier-0.0.3 vs lib/fluent/plugin/out_stats_notifier.rb in fluent-plugin-stats-notifier-0.0.4
- old
+ new
@@ -1,9 +1,14 @@
# encoding: UTF-8
class Fluent::StatsNotifierOutput < Fluent::Output
Fluent::Plugin.register_output('stats_notifier', self)
+ # To support log_level option implemented by Fluentd v0.10.43
+ unless method_defined?(:log)
+ define_method("log") { $log }
+ end
+
def initialize
super
require 'pathname'
end
@@ -11,74 +16,84 @@
config_param :interval, :time, :default => 5
config_param :less_than, :float, :default => nil
config_param :less_equal, :float, :default => nil
config_param :greater_than, :float, :default => nil
config_param :greater_equal, :float, :default => nil
- config_param :compare_with, :string, :default => "max"
- config_param :tag, :string
+ config_param :stats, :string, :default => "max"
+ config_param :compare_with, :string, :default => nil # Obsolete. Use aggregate_stats
+ config_param :aggregate_stats, :string, :default => "max" # Work only with aggregate :all
+ config_param :tag, :string, :default => nil
config_param :add_tag_prefix, :string, :default => nil
config_param :remove_tag_prefix, :string, :default => nil
+ config_param :add_tag_suffix, :string, :default => nil
+ config_param :remove_tag_suffix, :string, :default => nil
config_param :aggregate, :string, :default => 'all'
config_param :store_file, :string, :default => nil
attr_accessor :counts
- attr_accessor :matches
+ attr_accessor :queues
attr_accessor :saved_duration
attr_accessor :saved_at
attr_accessor :last_checked
def configure(conf)
super
@interval = @interval.to_i
if @less_than and @less_equal
- raise Fluent::ConfigError, "out_stats_notiifer: Only either of `less_than` or `less_equal` can be specified."
+ raise Fluent::ConfigError, "out_stats_notifier: Only either of `less_than` or `less_equal` can be specified."
end
if @greater_than and @greater_equal
- raise Fluent::ConfigError, "out_stats_notiifer: Only either of `greater_than` or `greater_equal` can be specified."
+ raise Fluent::ConfigError, "out_stats_notifier: Only either of `greater_than` or `greater_equal` can be specified."
end
- case @compare_with
+ @aggregate_stats = @compare_with if @compare_with # Support old version compatibility
+ case @aggregate_stats
when "sum"
- @compare_with = :sum
+ @aggregate_stats = :sum
when "max"
- @compare_with = :max
+ @aggregate_stats = :max
when "min"
- @compare_with = :min
+ @aggregate_stats = :min
when "avg"
- @compare_with = :avg
+ @aggregate_stats = :avg
else
- raise Fluent::ConfigError, "out_stats_notiifer: `compare_with` must be one of `sum`, `max`, `min`, `avg`"
+ raise Fluent::ConfigError, "out_stats_notifier: `aggregate_stats` must be one of `sum`, `max`, `min`, `avg`"
end
+ case @stats
+ when "sum"
+ @stats = :sum
+ when "max"
+ @stats = :max
+ when "min"
+ @stats = :min
+ when "avg"
+ @stats = :avg
+ else
+ raise Fluent::ConfigError, "out_stats_notifier: `stats` must be one of `sum`, `max`, `min`, `avg`"
+ end
+
+ if @tag.nil? and @add_tag_prefix.nil? and @remove_tag_prefix.nil? and @add_tag_suffix.nil? and @remove_tag_suffix.nil?
+ raise Fluent::ConfigError, "out_stats_notifier: No tag option is specified"
+ end
+ @tag_proc = tag_proc
+
case @aggregate
when 'all'
- raise Fluent::ConfigError, "anomalydetect: `tag` must be specified with aggregate all" if @tag.nil?
+ raise Fluent::ConfigError, "out_stats_notifier: `tag` must be specified with aggregate all" if @tag.nil?
+ @aggregate = :all
when 'tag'
- raise Fluent::ConfigError, "anomalydetect: `add_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil?
+ # raise Fluent::ConfigError, "out_stats_notifier: `add_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil?
+ @aggregate = :tag
else
- raise Fluent::ConfigError, "anomalydetect: aggregate allows tag/all"
+ raise Fluent::ConfigError, "out_stats_notifier: aggregate allows tag/all"
end
- @tag_prefix = "#{@add_tag_prefix}." if @add_tag_prefix
- @tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix
- @tag_proc =
- if @tag_prefix and @tag_prefix_match
- Proc.new {|tag| "#{@tag_prefix}#{lstrip(tag, @tag_prefix_match)}" }
- elsif @tag_prefix_match
- Proc.new {|tag| lstrip(tag, @tag_prefix_match) }
- elsif @tag_prefix
- Proc.new {|tag| "#{@tag_prefix}#{tag}" }
- elsif @tag
- Proc.new {|tag| @tag }
- else
- Proc.new {|tag| tag }
- end
-
@counts = {}
- @matches = {}
+ @queues = {}
@mutex = Mutex.new
end
def start
super
@@ -95,34 +110,34 @@
# Called when new line comes. This method actually does not emit
def emit(tag, es, chain)
key = @target_key
- # stats
- count = 0; matches = {}
+ # enqueus
+ count = 0; queues = {}
es.each do |time,record|
if record[key]
- # @todo: make an option for calcuation in the same tag. now only sum is supported
- matches[key] = (matches[key] ? matches[key] + record[key] : record[key])
+ queues[key] ||= []
+ queues[key] << record[key]
end
count += 1
end
# thread safe merge
@counts[tag] ||= 0
- @matches[tag] ||= {}
+ @queues[tag] ||= {}
@mutex.synchronize do
- if matches[key]
- # @todo: make an option for calcuation in the same tag. now only sum is supported
- @matches[tag][key] = (@matches[tag][key] ? @matches[tag][key] + matches[key] : matches[key])
+ if queues[key]
+ @queues[tag][key] ||= []
+ @queues[tag][key].concat(queues[key])
end
@counts[tag] += count
end
chain.next
rescue => e
- $log.warn "#{e.class} #{e.message} #{e.backtrace.first}"
+ log.warn "#{e.class} #{e.message} #{e.backtrace.first}"
end
# thread callback
def watcher
# instance variable, and public accessable, for test
@@ -136,58 +151,88 @@
now = Fluent::Engine.now
flush_emit(now - @last_checked)
@last_checked = now
end
rescue => e
- $log.warn "#{e.class} #{e.message} #{e.backtrace.first}"
+ log.warn "#{e.class} #{e.message} #{e.backtrace.first}"
end
end
end
# This method is the real one to emit
def flush_emit(step)
time = Fluent::Engine.now
- counts, matches, @counts, @matches = @counts, @matches, {}, {}
+ counts, queues, @counts, @queues = @counts, @queues, {}, {}
- if @aggregate == 'all'
- values = matches.values.map {|match| match[@target_key] }.compact
- output = generate_output(values)
+ # Get statistical value among events
+ evented_queues = {}
+ queues.each do |tag, queue|
+ evented_queues[tag] ||= {}
+ evented_queues[tag][@target_key] = get_stats(queue[@target_key], @stats) if queue[@target_key]
+ end
+
+ if @aggregate == :all
+ values = evented_queues.values.map {|queue| queue[@target_key] }.compact
+ value = get_stats(values, @aggregate_stats)
+ output = generate_output(value) if value
Fluent::Engine.emit(@tag, time, output) if output
else # aggregate tag
- matches.each do |tag, match|
- values = [match[@target_key]]
- output = generate_output(values)
+ evented_queues.each do |tag, queue|
+ value = queue[@target_key]
+ output = generate_output(value) if value
emit_tag = @tag_proc.call(tag)
Fluent::Engine.emit(emit_tag, time, output) if output
end
end
end
- def generate_output(values)
- case @compare_with
+ def get_stats(values, method = :max)
+ case method
when :sum
- target_value = values.inject(:+)
+ stats = values.inject(:+)
when :max
- target_value = values.max
+ stats = values.max
when :min
- target_value = values.min
+ stats = values.min
when :avg
- target_value = values.inject(:+) / values.count unless values.empty?
+ stats = values.inject(:+) / values.count unless values.empty?
end
+ end
- return nil if target_value.nil?
- return nil if target_value == 0 # ignore 0 because standby nodes receive 0 message usually
- return nil if @less_than and @less_than <= target_value
- return nil if @less_equal and @less_equal < target_value
- return nil if @greater_than and target_value <= @greater_than
- return nil if @greater_equal and target_value < @greater_equal
+ def generate_output(value)
+ return nil if value == 0 # ignore 0 because standby nodes receive 0 message usually
+ return nil if @less_than and @less_than <= value
+ return nil if @less_equal and @less_equal < value
+ return nil if @greater_than and value <= @greater_than
+ return nil if @greater_equal and value < @greater_equal
output = {}
- output[@target_key] = target_value
+ output[@target_key] = value
output
end
+ def tag_proc
+ rstrip = Proc.new {|str, substr| str.chomp(substr) }
+ lstrip = Proc.new {|str, substr| str.start_with?(substr) ? str[substr.size..-1] : str }
+ tag_prefix = "#{rstrip.call(@add_tag_prefix, '.')}." if @add_tag_prefix
+ tag_suffix = ".#{lstrip.call(@add_tag_suffix, '.')}" if @add_tag_suffix
+ tag_prefix_match = "#{rstrip.call(@remove_tag_prefix, '.')}." if @remove_tag_prefix
+ tag_suffix_match = ".#{lstrip.call(@remove_tag_suffix, '.')}" if @remove_tag_suffix
+ tag_fixed = @tag if @tag
+ if tag_fixed
+ Proc.new {|tag| tag_fixed }
+ elsif tag_prefix_match and tag_suffix_match
+ Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag, tag_prefix_match), tag_suffix_match)}#{tag_suffix}" }
+ elsif tag_prefix_match
+ Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag, tag_prefix_match)}#{tag_suffix}" }
+ elsif tag_suffix_match
+ Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag, tag_suffix_match)}#{tag_suffix}" }
+ else
+ Proc.new {|tag| "#{tag_prefix}#{tag}#{tag_suffix}" }
+ end
+ end
+
# Store internal status into a file
#
# @param [String] file_path
def save_status(file_path)
return unless file_path
@@ -196,18 +241,18 @@
Pathname.new(file_path).open('wb') do |f|
@saved_at = Fluent::Engine.now
@saved_duration = @saved_at - @last_checked
Marshal.dump({
:counts => @counts,
- :matches => @matches,
+ :queues => @queues,
:saved_at => @saved_at,
:saved_duration => @saved_duration,
:target_key => @target_key,
}, f)
end
rescue => e
- $log.warn "out_stats_notifier: Can't write store_file #{e.class} #{e.message}"
+ log.warn "out_stats_notifier: Can't write store_file #{e.class} #{e.message}"
end
end
# Load internal status from a file
#
@@ -218,27 +263,30 @@
begin
f.open('rb') do |f|
stored = Marshal.load(f)
if stored[:target_key] == @target_key
+ if stored[:queues]
+ if Fluent::Engine.now <= stored[:saved_at] + interval
+ @counts = stored[:counts]
+ @queues = stored[:queues]
+ @saved_at = stored[:saved_at]
+ @saved_duration = stored[:saved_duration]
- if Fluent::Engine.now <= stored[:saved_at] + interval
- @counts = stored[:counts]
- @matches = stored[:matches]
- @saved_at = stored[:saved_at]
- @saved_duration = stored[:saved_duration]
-
- # skip the saved duration to continue counting
- @last_checked = Fluent::Engine.now - @saved_duration
+ # skip the saved duration to continue counting
+ @last_checked = Fluent::Engine.now - @saved_duration
+ else
+ log.warn "out_stats_notifier: stored data is outdated. ignore stored data"
+ end
else
- $log.warn "out_stats_notifier: stored data is outdated. ignore stored data"
+ log.warn "out_stats_notifier: stored data is incompatible. ignore stored data"
end
else
- $log.warn "out_stats_notiifer: configuration param was changed. ignore stored data"
+ log.warn "out_stats_notifier: configuration param was changed. ignore stored data"
end
end
rescue => e
- $log.warn "out_stats_notifier: Can't load store_file #{e.class} #{e.message}"
+ log.warn "out_stats_notifier: Can't load store_file #{e.class} #{e.message}"
end
end
end