require "fluent/plugin/output" class Fluent::Plugin::NotifierOutput < Fluent::Plugin::Output Fluent::Plugin.register_output('notifier', self) helpers :event_emitter NOTIFICATION_LEVELS = ['OK', 'WARN', 'CRIT', 'LOST'].freeze STATES_CLEAN_INTERVAL = 3600 # 1hours STATES_EXPIRE_SECONDS = 14400 # 4hours config_param :default_tag, :string, default: 'notification' config_param :default_tag_warn, :string, default: nil config_param :default_tag_crit, :string, default: nil config_param :default_intervals, :array, value_type: :time, default: [60, 300, 1800] config_param :default_repetitions, :array, value_type: :integer, default: [5, 5] config_param :default_interval_1st, :time, default: nil config_param :default_interval_2nd, :time, default: nil config_param :default_interval_3rd, :time, default: nil config_param :default_repetitions_1st, :integer, default: nil config_param :default_repetitions_2nd, :integer, default: nil config_param :input_tag_remove_prefix, :string, default: nil config_section :test, multi: true, param_name: :test_configs do config_param :check, :enum, list: [:tag, :numeric, :regexp] config_param :target_key, :string, default: nil config_param :lower_threshold, :float, default: nil config_param :upper_threshold, :float, default: nil config_param :include_pattern, :string, default: nil config_param :exclude_pattern, :string, default: nil end config_section :def, multi: true, param_name: :def_configs do config_param :pattern, :string config_param :check, :enum, list: [:numeric_upward, :numeric_downward, :string_find] config_param :target_keys, :array, value_type: :string, default: nil config_param :target_key_pattern, :string, default: nil config_param :exclude_key_pattern, :string, default: '^$' config_param :tag, :string, default: nil config_param :tag_warn, :string, default: nil config_param :tag_crit, :string, default: nil # numeric_upward/downward config_param :crit_threshold, :float, default: nil config_param :warn_threshold, :float, default: nil # string_find config_param :crit_regexp, :string, default: nil config_param :warn_regexp, :string, default: nil # repeat & interval config_param :intervals, :array, value_type: :time, default: nil config_param :interval_1st, :time, default: nil config_param :interval_2nd, :time, default: nil config_param :interval_3rd, :time, default: nil config_param :repetitions, :array, value_type: :integer, default: nil config_param :repetitions_1st, :integer, default: nil config_param :repetitions_2nd, :integer, default: nil end attr_accessor :tests, :defs, :states, :match_cache, :negative_cache ### output # { # 'pattern' => 'http_status_errors', # 'target_tag' => 'httpstatus.blog', # 'target_key' => 'blog_5xx_percentage', # 'check_type' => 'numeric_upward' # 'level' => 'warn', # 'threshold' => 25, # or 'regexp' => ...., # 'value' => 49, # or 'value' => 'matched some string...', # 'message_time' => Time.instance # } # <match httpstatus.blog> # type notifier # default_tag notification # default_interval_1st 1m # default_repetitions_1st 5 # default_interval_2nd 5m # default_repetitions_2nd 5 # default_interval_3rd 30m # <test> # check numeric # target_key xxx # lower_threshold xxx # upper_threshold xxx # </test> # <test> # check regexp # target_key xxx # include_pattern ^.$ # exclude_pattern ^.$ # </test> # <def> # pattern http_status_errors # check numeric_upward # warn_threshold 25 # crit_threshold 50 # tag alert # # tag_warn alert.warn # # tag_crit alert.crit # # target_keys blog_5xx_percentage # target_key_pattern ^.*_5xx_percentage$ # </def> # <def> # pattern log_checker # check string_find # crit_pattern 'ERROR' # warn_pattern 'WARNING' # tag alert # # target_keys message # target_key_pattern ^.*_message$ # </def> # </match> def configure(conf) super @match_cache = {} # cache which has map (fieldname => definition(s)) @negative_cache = {} @tests = [] @defs = [] @states = {} # key: tag+field ? if @input_tag_remove_prefix @input_tag_remove_prefix_string = @input_tag_remove_prefix + '.' @input_tag_remove_prefix_length = @input_tag_remove_prefix_string.length end if @default_interval_1st || @default_interval_2nd || @default_interval_3rd @default_intervals = [ @default_interval_1st || @default_intervals[0], @default_interval_2nd || @default_intervals[1], @default_interval_3rd || @default_intervals[2], ] end if @default_repetitions_1st || @default_repetitions_2nd @default_repetitions = [ @default_repetitions_1st || @default_repetitions[0], @default_repetitions_2nd || @default_repetitions[1], ] end @test_configs.each do |test_config| @tests << Test.new(test_config) end @def_configs.each do |def_config| @defs << Definition.new(def_config, self) end end def multi_workers_ready? true end def start super @mutex = Mutex.new @last_status_cleaned = Fluent::Engine.now end def suppressed_emit(notifications) now = Fluent::Engine.now notifications.each do |n| hashkey = n.delete(:hashkey) definition = n.delete(:match_def) tag = n.delete(:emit_tag) state = @states[hashkey] if state unless state.suppress?(definition, n) router.emit(tag, now, n) state.update_notified(definition, n) end else router.emit(tag, now, n) @states[hashkey] = State.new(n) end end end def states_cleanup now = Fluent::Engine.now @states.keys.each do |key| if now - @states[key].last_notified > STATES_EXPIRE_SECONDS @states.delete(key) end end end def check(tag, es) notifications = [] tag = if @input_tag_remove_prefix and tag.start_with?(@input_tag_remove_prefix_string) and tag.length > @input_tag_remove_prefix_length tag[@input_tag_remove_prefix_length..-1] else tag end es.each do |time,record| record.keys.each do |key| next if @negative_cache[key] defs = @match_cache[key] unless defs defs = [] @defs.each do |d| defs.push(d) if d.match?(key) end @negative_cache[key] = true if defs.size < 1 end defs.each do |d| next unless @tests.reduce(true){|r,t| r and t.test(tag, record)} alert = d.check(tag, time, record, key) if alert notifications.push(alert) end end end end notifications end def process(tag, es) notifications = check(tag, es) if notifications.size > 0 @mutex.synchronize do suppressed_emit(notifications) end end if Fluent::Engine.now - @last_status_cleaned > STATES_CLEAN_INTERVAL @mutex.synchronize do states_cleanup @last_status_cleaned = Fluent::Engine.now end end end class Test attr_accessor :check, :target_key attr_accessor :lower_threshold, :upper_threshold attr_accessor :include_pattern, :exclude_pattern def initialize(section) @check = section.check @target_key = section.target_key case @check when :tag if !section.include_pattern && !section.exclude_pattern raise Fluent::ConfigError, "At least one of include_pattern or exclude_pattern must be specified for 'check tag'" end @include_pattern = section.include_pattern ? Regexp.compile(section.include_pattern) : nil @exclude_pattern = section.exclude_pattern ? Regexp.compile(section.exclude_pattern) : nil when :numeric if !section.lower_threshold && !section.upper_threshold raise Fluent::ConfigError, "At least one of lower_threshold or upper_threshold must be specified for 'check numeric'" end raise Fluent::ConfigError, "'target_key' is needed for 'check numeric'" unless @target_key @lower_threshold = section.lower_threshold @upper_threshold = section.upper_threshold when :regexp if !section.include_pattern && !section.exclude_pattern raise Fluent::ConfigError, "At least one of include_pattern or exclude_pattern must be specified for 'check regexp'" end raise Fluent::ConfigError, "'target_key' is needed for 'check regexp'" unless @target_key @include_pattern = section.include_pattern ? Regexp.compile(section.include_pattern) : nil @exclude_pattern = section.exclude_pattern ? Regexp.compile(section.exclude_pattern) : nil else raise "BUG: unknown check: #{@check}" end end def test(tag, record) v = case @check when :numeric, :regexp record[@target_key] when :tag tag end return false if v.nil? case @check when :numeric v = v.to_f (@lower_threshold.nil? or @lower_threshold <= v) and (@upper_threshold.nil? or v <= @upper_threshold) when :tag, :regexp v = v.to_s.force_encoding('ASCII-8BIT') ((@include_pattern.nil? or @include_pattern.match(v)) and (@exclude_pattern.nil? or (not @exclude_pattern.match(v)))) or false end end end class Definition attr_accessor :tag, :tag_warn, :tag_crit attr_accessor :intervals, :repetitions attr_accessor :pattern, :target_keys, :target_key_pattern, :exclude_key_pattern attr_accessor :crit_threshold, :warn_threshold # for 'numeric_upward', 'numeric_downward' attr_accessor :crit_regexp, :warn_regexp # for 'string_find' def initialize(section, plugin) @pattern = section.pattern @tag = section.tag || plugin.default_tag @tag_warn = section.tag_warn || plugin.default_tag_warn @tag_crit = section.tag_crit || plugin.default_tag_crit @target_keys = section.target_keys @target_key_pattern = section.target_key_pattern ? Regexp.compile(section.target_key_pattern) : nil @exclude_key_pattern = section.exclude_key_pattern ? Regexp.compile(section.exclude_key_pattern) : nil if !@target_keys and !@target_key_pattern raise Fluent::ConfigError, "out_notifier needs one of target_keys or target_key_pattern in <def>" end case section.check when :numeric_upward @check = :upward if !section.crit_threshold || !section.warn_threshold raise Fluent::ConfigError, "Both of crit_threshold and warn_threshold must be specified for 'check numeric_upward'" end @crit_threshold = section.crit_threshold @warn_threshold = section.warn_threshold when :numeric_downward @check = :downward if !section.crit_threshold || !section.warn_threshold raise Fluent::ConfigError, "Both of crit_threshold and warn_threshold must be specified for 'check numeric_downward'" end @crit_threshold = section.crit_threshold @warn_threshold = section.warn_threshold when :string_find @check = :find if !section.crit_regexp || !section.warn_regexp raise Fluent::ConfigError, "Both of crit_regexp and warn_regexp must be specified for 'check string_find'" end @crit_regexp = Regexp.compile(section.crit_regexp) @warn_regexp = Regexp.compile(section.warn_regexp) else raise "BUG: unknown check: #{section.check}" end @intervals = if section.intervals section.intervals elsif section.interval_1st || section.interval_2nd || section.interval_3rd [section.interval_1st || plugin.default_intervals[0], section.interval_2nd || plugin.default_intervals[1], section.interval_3rd || plugin.default_intervals[2]] else plugin.default_intervals end @repetitions = if section.repetitions section.repetitions elsif section.repetitions_1st || section.repetitions_2nd [section.repetitions_1st || plugin.default_repetitions[0], section.repetitions_2nd || plugin.default_repetitions[1]] else plugin.default_repetitions end end def match?(key) if @target_keys @target_keys.include?(key) elsif @target_key_pattern @target_key_pattern.match(key) and not @exclude_key_pattern.match(key) end end # { # 'pattern' => 'http_status_errors', # 'target_tag' => 'httpstatus.blog', # 'target_key' => 'blog_5xx_percentage', # 'check_type' => 'numeric_upward' # 'level' => 'warn', # 'regexp' => '[WARN] .* MUST BE CHECKED!$' # 'threshold' => 25, # 'value' => 49, # 'value' => '2012/05/15 18:01:59 [WARN] wooooops, MUST BE CHECKED!' # 'message_time' => Time.instance # } def check(tag, time, record, key) case @check when :upward value = record[key].to_f if @crit_threshold and value >= @crit_threshold { :hashkey => @pattern + "\t" + tag + "\t" + key, :match_def => self, :emit_tag => (@tag_crit || @tag), 'pattern' => @pattern, 'target_tag' => tag, 'target_key' => key, 'check_type' => 'numeric_upward', 'level' => 'crit', 'threshold' => @crit_threshold, 'value' => value, 'message_time' => Time.at(time).to_s } elsif @warn_threshold and value >= @warn_threshold { :hashkey => @pattern + "\t" + tag + "\t" + key, :match_def => self, :emit_tag => (@tag_warn || @tag), 'pattern' => @pattern, 'target_tag' => tag, 'target_key' => key, 'check_type' => 'numeric_upward', 'level' => 'warn', 'threshold' => @warn_threshold, 'value' => value, 'message_time' => Time.at(time).to_s } else nil end when :downward value = record[key].to_f if @crit_threshold and value <= @crit_threshold { :hashkey => @pattern + "\t" + tag + "\t" + key, :match_def => self, :emit_tag => (@tag_crit || @tag), 'pattern' => @pattern, 'target_tag' => tag, 'target_key' => key, 'check_type' => 'numeric_downward', 'level' => 'crit', 'threshold' => @crit_threshold, 'value' => value, 'message_time' => Time.at(time).to_s } elsif @warn_threshold and value <= @warn_threshold { :hashkey => @pattern + "\t" + tag + "\t" + key, :match_def => self, :emit_tag => (@tag_warn || @tag), 'pattern' => @pattern, 'target_tag' => tag, 'target_key' => key, 'check_type' => 'numeric_downward', 'level' => 'warn', 'threshold' => @warn_threshold, 'value' => value, 'message_time' => Time.at(time).to_s } else nil end when :find str = record[key].to_s if match(@crit_regexp, str) { :hashkey => @pattern + "\t" + tag + "\t" + key, :match_def => self, :emit_tag => (@tag_crit || @tag), 'pattern' => @pattern, 'target_tag' => tag, 'target_key' => key, 'check_type' => 'string_find', 'level' => 'crit', 'regexp' => @crit_regexp.inspect, 'value' => str, 'message_time' => Time.at(time).to_s } elsif match(@warn_regexp, str) { :hashkey => @pattern + "\t" + tag + "\t" + key, :match_def => self, :emit_tag => (@tag_warn || @tag), 'pattern' => @pattern, 'target_tag' => tag, 'target_key' => key, 'check_type' => 'string_find', 'level' => 'warn', 'regexp' => @warn_regexp.inspect, 'value' => str, 'message_time' => Time.at(time).to_s } else nil end else raise ArgumentError, "unknown check type (maybe bug): #{@check}" end end def match(regexp,string) regexp && regexp.match(string) rescue ArgumentError => e raise e unless e.message.index("invalid byte sequence in") == 0 replaced_string = replace_invalid_byte(string) regexp.match(replaced_string) end def replace_invalid_byte(string) replace_options = { invalid: :replace, undef: :replace, replace: '?' } temporal_encoding = (string.encoding == Encoding::UTF_8 ? Encoding::UTF_16BE : Encoding::UTF_8) string.encode(temporal_encoding, string.encoding, replace_options).encode(string.encoding) end end class State # level: :warn, :crit # stage: 0(1st)/1(2nd)/2(3rd) attr_accessor :pattern, :target_tag, :target_key, :level, :stage, :counter, :first_notified, :last_notified def initialize(notification) @pattern = notification[:pattern] @target_tag = notification[:target_tag] @target_key = notification[:target_key] @level = notification['level'] @stage = 0 @counter = 1 t = Fluent::Engine.now @first_notified = t @last_notified = t end def suppress?(definition, notification) if @level == notification['level'] (Fluent::Engine.now - @last_notified) <= definition.intervals[@stage] else true end end def update_notified(definition, notification) t = Fluent::Engine.now if @level == notification['level'] rep = definition.repetitions[@stage] if rep and rep > 0 @counter += 1 if @counter > rep @stage += 1 @counter = 0 end end else @level = notification['level'] @stage = 0 @counter = 1 @first_notified = t end @last_notified = t end end end