require 'helper' class AnomalyDetectOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup end CONFIG = %[ tag test.anomaly outlier_term 28 outlier_discount 0.05 score_term 28 score_discount 0.05 tick 10 smooth_term 3 target y ] def create_driver (conf=CONFIG, tag="debug.anomaly") Fluent::Test::OutputTestDriver.new(Fluent::AnomalyDetectOutput, tag).configure(conf) end def test_configure d = create_driver('') assert_equal 28, d.instance.outlier_term assert_equal 0.05, d.instance.outlier_discount assert_equal 7, d.instance.smooth_term assert_equal 14, d.instance.score_term assert_equal 0.1, d.instance.score_discount assert_equal 300, d.instance.tick assert_nil d.instance.target assert_equal 'anomaly', d.instance.tag assert d.instance.record_count d = create_driver assert_equal 28, d.instance.outlier_term assert_equal 0.05, d.instance.outlier_discount assert_equal 3, d.instance.smooth_term assert_equal 28, d.instance.score_term assert_equal 0.05, d.instance.score_discount assert_equal 10, d.instance.tick assert_equal "y", d.instance.target assert_equal 'test.anomaly', d.instance.tag assert !d.instance.record_count assert_raise(Fluent::ConfigError) { d = create_driver %[ outlier_discount 1.3 ] } assert_raise(Fluent::ConfigError) { d = create_driver %[ score_discount 1.3 ] } assert_raise(Fluent::ConfigError) { d = create_driver %[ outlier_discount -0.3 ] } assert_raise(Fluent::ConfigError) { d = create_driver %[ score_discount -0.3 ] } assert_raise(Fluent::ConfigError) { d = create_driver %[ outlier_term 0 ] } assert_raise(Fluent::ConfigError) { d = create_driver %[ score_term 0 ] } assert_raise(Fluent::ConfigError) { d = create_driver %[ smooth_term 0 ] } assert_raise(Fluent::ConfigError) { d = create_driver %[ tick 0 ] } end def test_array_init d = create_driver assert_equal [], d.instance.outlier_buf assert_nil d.instance.records # @records is initialized at start, not configure end def test_sdar d = create_driver assert_instance_of Fluent::ChangeFinder, d.instance.outlier assert_instance_of Fluent::ChangeFinder, d.instance.score end def test_emit_record_count d = create_driver %[ tag test.anomaly outlier_term 28 outlier_discount 0.05 score_term 28 score_discount 0.05 tick 10 smooth_term 3 ] data = 10.times.map { (rand * 100).to_i } + [0] d.run do data.each do |val| (0..val - 1).each do || d.emit({'y' => 1}) end r = d.instance.flush assert_equal val, r['target'] end end end def test_emit_target d = create_driver %[ tag test.anomaly outlier_term 28 outlier_discount 0.05 score_term 28 score_discount 0.05 tick 10 smooth_term 3 target y ] data = 10.times.map { (rand * 100).to_i } + [0] d.run do data.each do |val| d.emit({'y' => val}) r = d.instance.flush assert_equal val, r['target'] end end end def test_emit_when_target_does_not_exist d = create_driver %[ tag test.anomaly outlier_term 28 outlier_discount 0.05 score_term 28 score_discount 0.05 tick 10 smooth_term 3 target y ] d.run do 10.times do d.emit({'foobar' => 999.99}) r = d.instance.flush assert_equal nil, r end end end def test_emit_stock_data require 'csv' reader = CSV.open("test/stock.2432.csv", "r") header = reader.take(1)[0] d = create_driver d.run do reader.each_with_index do |row, idx| break if idx > 5 d.emit({'y' => row[4].to_i}) r = d.instance.flush assert r['target'] assert r['outlier'] assert r['score'] end end end def test_store_file dir = "test/tmp" Dir.mkdir dir unless Dir.exist? dir file = "#{dir}/test.dat" File.unlink file if File.exist? file d = create_driver %[ store_file #{file} ] d.run do assert_equal [], d.instance.outlier_buf d.emit({'x' => 1}) d.emit({'x' => 1}) d.emit({'x' => 1}) d.instance.flush d.emit({'x' => 1}) d.emit({'x' => 1}) d.emit({'x' => 1}) d.instance.flush end assert File.exist? file d2 = create_driver %[ store_file #{file} ] d2.run do assert_equal 2, d2.instance.outlier_buf.size end File.unlink file end def test_set_large_threshold require 'csv' reader = CSV.open("test/stock.2432.csv", "r") header = reader.take(1)[0] d = create_driver %[ threshold 1000 ] d.run do reader.each_with_index do |row, idx| break if idx > 5 d.emit({'y' => row[4].to_i}) r = d.instance.flush assert_equal nil, r end end end def test_set_small_threshold require 'csv' reader = CSV.open("test/stock.2432.csv", "r") header = reader.take(1)[0] d = create_driver %[ threshold 1 ] d.run do reader.each_with_index do |row, idx| break if idx > 5 d.emit({'y' => row[4].to_i}) r = d.instance.flush assert_not_equal nil, r end end end def test_up_trend d = create_driver %[ target y trend up ] # should not output in down trend d.run do d.emit({'y' => 0.0}); d.instance.flush d.emit({'y' => 0.0}); d.instance.flush d.emit({'y' => 0.0}); d.instance.flush d.emit({'y' => -1.0}); r = d.instance.flush assert_equal nil, r end # should output in up trend d.run do d.emit({'y' => -1.0}); d.instance.flush d.emit({'y' => -1.0}); d.instance.flush d.emit({'y' => -1.0}); d.instance.flush d.emit({'y' => 0.0}); r = d.instance.flush assert_not_equal nil, r end end def test_down_trend d = create_driver %[ target y trend down ] # should output in down trend d.run do d.emit({'y' => 0.0}); d.instance.flush d.emit({'y' => 0.0}); d.instance.flush d.emit({'y' => 0.0}); d.instance.flush d.emit({'y' => -1.0}); r = d.instance.flush assert_not_equal nil, r end # should not output in up tread d.run do d.emit({'y' => -1.0}); d.instance.flush d.emit({'y' => -1.0}); d.instance.flush d.emit({'y' => -1.0}); d.instance.flush d.emit({'y' => 0.0}) r = d.instance.flush assert_equal nil, r end end end