require 'helper'

class AnomalyDetectOutputTest < Test::Unit::TestCase
  def setup
    Fluent::Test.setup
  end

  CONFIG = %[
    tag test.anomaly
    outlier_term 28
    outlier_discount 0.05
    score_term 28
    score_discount 0.05
    tick 10
    smooth_term 3
    target y
  ]

  def create_driver (conf=CONFIG, tag="debug.anomaly")
    Fluent::Test::OutputTestDriver.new(Fluent::AnomalyDetectOutput, tag).configure(conf)
  end

  def test_configure
    d = create_driver('')
    assert_equal 28, d.instance.outlier_term
    assert_equal 0.05, d.instance.outlier_discount
    assert_equal 7, d.instance.smooth_term
    assert_equal 14, d.instance.score_term
    assert_equal 0.1, d.instance.score_discount
    assert_equal 300, d.instance.tick
    assert_nil d.instance.target
    assert_equal 'anomaly', d.instance.tag

    d = create_driver
    assert_equal 28, d.instance.outlier_term
    assert_equal 0.05, d.instance.outlier_discount
    assert_equal 3, d.instance.smooth_term
    assert_equal 28, d.instance.score_term
    assert_equal 0.05, d.instance.score_discount
    assert_equal 10, d.instance.tick
    assert_equal "y", d.instance.target
    assert_equal 'test.anomaly', d.instance.tag

    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        outlier_discount 1.3
      ]
    }
    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        score_discount 1.3
      ]
    }
    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        outlier_discount -0.3
      ]
    }
    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        score_discount -0.3
      ]
    }
    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        outlier_term 0
      ]
    }
    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        score_term 0
      ]
    }
    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        smooth_term 0
      ]
    }
    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        tick 0
      ]
    }
    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        target y
        targets x,y,z
      ]
    }
    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        threshold 1.0
        thresholds 1.0,2.0
      ]
    }
    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        thresholds 1,2
      ]
    }
    assert_raise(Fluent::ConfigError) {
      d = create_driver %[
        targets x,y,z
        thresholds 1
      ]
    }
  end

  def test_emit_record_count
    d = create_driver %[
      tag test.anomaly
      outlier_term 28
      outlier_discount 0.05
      score_term 28
      score_discount 0.05
      tick 10
      smooth_term 3
    ]

    data = 10.times.map { (rand * 100).to_i } + [0]
    d.run do
      data.each do |val|
        (0..val - 1).each do ||
          d.emit({'y' => 1})
        end
        r = d.instance.flush[:all]
        assert_equal val, r['target']
      end
    end
  end

  def test_emit_target
    d = create_driver %[
      tag test.anomaly
      outlier_term 28
      outlier_discount 0.05
      score_term 28
      score_discount 0.05
      tick 10
      smooth_term 3
      target y
    ]

    data = 10.times.map { (rand * 100).to_i } + [0]
    d.run do
      data.each do |val|
        d.emit({'y' => val})
        r = d.instance.flush[:all]
        assert_equal val, r['target']
      end
    end
  end

  def test_emit_when_target_does_not_exist
    d = create_driver %[
      tag test.anomaly
      outlier_term 28
      outlier_discount 0.05
      score_term 28
      score_discount 0.05
      tick 10
      smooth_term 3
      target y
    ]

    d.run do
      10.times do
        d.emit({'foobar' => 999.99})
        r = d.instance.flush[:all]
        assert_equal nil, r
      end
    end
  end

  def test_emit_stock_data
    require 'csv'
    reader = CSV.open("test/stock.2432.csv", "r")
    header = reader.take(1)[0]
    d = create_driver
    d.run do
      reader.each_with_index do |row, idx|
        break if idx > 5
        d.emit({'y' => row[4].to_i})
        r = d.instance.flush[:all]
        assert r['target']
        assert r['outlier']
        assert r['score']
      end
    end
  end

  def test_store_file
    dir = "test/tmp"
    Dir.mkdir dir unless Dir.exist? dir
    file = "#{dir}/test.dat"
    File.unlink file if File.exist? file

    d = create_driver %[
      store_file #{file}
    ]

    d.run do
      assert_equal([], d.instance.outlier_bufs(:all))
      d.emit({'x' => 1})
      d.emit({'x' => 1})
      d.emit({'x' => 1})
      d.instance.flush[:all]
      d.emit({'x' => 1})
      d.emit({'x' => 1})
      d.emit({'x' => 1})
      d.instance.flush[:all]
    end
    assert File.exist? file

    d2 = create_driver %[
      store_file #{file}
    ]
    d2.run do
      assert_equal 2, d2.instance.outlier_bufs(:all).size
    end

    File.unlink file
  end

  def test_set_large_threshold
    require 'csv'
    reader = CSV.open("test/stock.2432.csv", "r")
    header = reader.take(1)[0]
    d = create_driver %[
      threshold 1000
    ]
    d.run do
      reader.each_with_index do |row, idx|
        break if idx > 5
        d.emit({'y' => row[4].to_i})
        r = d.instance.flush[:all]
        assert_equal nil, r
      end
    end
  end

  def test_set_small_threshold
    require 'csv'
    reader = CSV.open("test/stock.2432.csv", "r")
    header = reader.take(1)[0]
    d = create_driver %[
      threshold 1
    ]
    d.run do
      reader.each_with_index do |row, idx|
        break if idx > 5
        d.emit({'y' => row[4].to_i})
        r = d.instance.flush[:all]
        assert_not_equal nil, r
      end
    end
  end

  def test_up_trend
    d = create_driver %[
      target y
      trend up
    ]

    # should not output in down trend
    d.run do
      d.emit({'y' => 0.0}); d.instance.flush
      d.emit({'y' => 0.0}); d.instance.flush
      d.emit({'y' => 0.0}); d.instance.flush
      d.emit({'y' => -1.0}); r = d.instance.flush[:all]
      assert_equal nil, r
    end

    # should output in up trend
    d.run do
      d.emit({'y' => -1.0}); d.instance.flush
      d.emit({'y' => -1.0}); d.instance.flush
      d.emit({'y' => -1.0}); d.instance.flush
      d.emit({'y' => 0.0}); r = d.instance.flush[:all]
      assert_not_equal nil, r
    end
  end

  def test_down_trend
    d = create_driver %[
      target y
      trend down
    ]
    # should output in down trend
    d.run do
      d.emit({'y' => 0.0}); d.instance.flush
      d.emit({'y' => 0.0}); d.instance.flush
      d.emit({'y' => 0.0}); d.instance.flush
      d.emit({'y' => -1.0}); r = d.instance.flush[:all]
      assert_not_equal nil, r
    end

    # should not output in up tread
    d.run do
      d.emit({'y' => -1.0}); d.instance.flush
      d.emit({'y' => -1.0}); d.instance.flush
      d.emit({'y' => -1.0}); d.instance.flush
      d.emit({'y' => 0.0})
      r = d.instance.flush[:all]
      assert_equal nil, r
    end
  end

  def test_aggregate_tag
    d = create_driver %[
      outlier_term 28
      outlier_discount 0.05
      score_term 28
      score_discount 0.05
      tick 10
      smooth_term 3
      aggregate tag
      add_tag_prefix test
    ]

    data = 10.times.map { (rand * 100).to_i } + [0]
    d.run do
      data.each do |val|
        (0..val - 1).each do ||
          d.emit({'y' => 1})
        end
        r = d.instance.flush['debug.anomaly']
        assert_equal val, r['target']
      end
    end
  end

  def test_targets
    d = create_driver %[
      targets x,y
    ]
    data = 10.times.map { (rand * 100).to_i } + [0]
    d.run do
      data.each do |val|
        d.emit({'x' => val, 'y' => val})
        r = d.instance.flush[:all]
        assert_equal val, r['x']
        assert_equal val, r['y']
      end
    end
  end

  def test_targets_default_suffix
    d = create_driver %[
      targets x,y
    ]
    data = 1.times.map { (rand * 100).to_i } + [0]
    d.run do
      data.each do |val|
        d.emit({'x' => val, 'y' => val})
        r = d.instance.flush[:all]
        assert r.has_key?('x')
        assert r.has_key?('y')
        assert r.has_key?('x_outlier')
        assert r.has_key?('x_score')
        assert r.has_key?('y_outlier')
        assert r.has_key?('y_score')
      end
    end
  end

  def test_targets_suffix
    d = create_driver %[
      targets x,y
      outlier_suffix
      score_suffix _anomaly
      target_suffix _target
    ]
    data = 1.times.map { (rand * 100).to_i } + [0]
    d.run do
      data.each do |val|
        d.emit({'x' => val, 'y' => val})
        r = d.instance.flush[:all]
        assert r.has_key?('x_target')
        assert r.has_key?('y_target')
        assert r.has_key?('x')
        assert r.has_key?('x_anomaly')
        assert r.has_key?('y')
        assert r.has_key?('y_anomaly')
      end
    end
  end

  def test_targets_thresholds
    d = create_driver %[
      targets x,y
      thresholds 1,2
    ]
    d.run do
      thresholds = d.instance.thresholds
      assert_equal 1, thresholds['x']
      assert_equal 2, thresholds['y']

      threshold_proc = d.instance.threshold_proc
      assert_equal 1, threshold_proc.call('x')
      assert_equal 2, threshold_proc.call('y')
    end
  end

  def test_suppress_tick
    d = create_driver %[
      tick 10
      suppress_tick 30
      target y
    ]

    data = 10.times.map { (rand * 100).to_i } + [0]
    d.run do
      data.each do |val|
        d.emit({'y' => val})
        r = d.instance.flush[:all]
        assert_equal nil, r
      end
    end
  end
end