require 'helper' require 'fluent/test/driver/output' class NumericMonitorOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup end CONFIG = %[ unit minute tag monitor.test input_tag_remove_prefix test monitor_key field1 percentiles 80,90 ] def create_driver(conf = CONFIG) Fluent::Test::Driver::Output.new(Fluent::Plugin::NumericMonitorOutput).configure(conf) end def test_configure assert_raise(Fluent::ConfigError) { create_driver('') } assert_raise(Fluent::ConfigError) { create_driver CONFIG + %[ output_per_tag true ] } assert_nothing_raised { create_driver CONFIG + %[ output_per_tag true @label yay ] } d = create_driver(CONFIG) assert_equal(60, d.instance.count_interval) assert_equal(:minute, d.instance.unit) assert_equal("monitor.test", d.instance.tag) assert_nil(d.instance.tag_prefix) assert_false(d.instance.output_per_tag) assert_equal(:tag, d.instance.aggregate) assert_equal("test", d.instance.input_tag_remove_prefix) assert_equal("field1", d.instance.monitor_key) assert_equal([80, 90], d.instance.percentiles) end sub_test_case "#count_initialized" do test "aggregate all" do d = create_driver(CONFIG + %[aggregate all]) all = {"all" => {min: nil, max: nil, sum: nil, num: 0, sample: []}} assert_equal(all, d.instance.count_initialized) end test "default" do d = create_driver(CONFIG) assert_equal({}, d.instance.count_initialized) end test "with keys" do d = create_driver(CONFIG) tag1 = "tag1" tag2 = "tag2" expected = {tag1 => {min: nil, max: nil, sum: nil, num: 0, sample: []}, tag2 => {min: nil, max: nil, sum: nil, num: 0, sample: []}} assert_equal(expected, d.instance.count_initialized([tag1, tag2])) end end def test_countups #TODO end def test_stripped_tag d = create_driver assert_equal 'input', d.instance.stripped_tag('test.input') assert_equal 'test.input', d.instance.stripped_tag('test.test.input') assert_equal 'input', d.instance.stripped_tag('input') end def test_generate_output #TODO end def test_emit d1 = create_driver(CONFIG) d1.run(default_tag: 'test.tag1') do 10.times do d1.feed({'field1' => 0}) d1.feed({'field1' => '1'}) d1.feed({'field1' => 2}) d1.feed({'field1' => '3'}) d1.feed({'field1' => 4}) d1.feed({'field1' => 5}) d1.feed({'field1' => 6}) d1.feed({'field1' => 7}) d1.feed({'field1' => 8}) d1.feed({'field1' => 9}) end end r1 = d1.instance.flush assert_equal 0, r1['tag1_min'] assert_equal 9, r1['tag1_max'] assert_equal 4.5, r1['tag1_avg'] assert_equal 450, r1['tag1_sum'] assert_equal 7, r1['tag1_percentile_80'] assert_equal 8, r1['tag1_percentile_90'] assert_equal 100, r1['tag1_num'] end def test_emit_aggregate_all d1 = create_driver(%[ unit minute tag monitor.test input_tag_remove_prefix test aggregate all monitor_key field1 percentiles 80,90 ]) d1.run(default_tag: 'test.tag1') do 10.times do d1.feed({'field1' => 0}) d1.feed({'field1' => '1'}) d1.feed({'field1' => 2}) d1.feed({'field1' => '3'}) d1.feed({'field1' => 4}) d1.feed({'field1' => 5}) d1.feed({'field1' => 6}) d1.feed({'field1' => 7}) d1.feed({'field1' => 8}) d1.feed({'field1' => 9}) end end r1 = d1.instance.flush assert_equal 0, r1['min'] assert_equal 9, r1['max'] assert_equal 4.5, r1['avg'] assert_equal 450, r1['sum'] assert_equal 7, r1['percentile_80'] assert_equal 8, r1['percentile_90'] assert_equal 100, r1['num'] end def test_without_percentiles d = create_driver(%[ unit minute tag testmonitor monitor_key x1 ]) d.run(default_tag: 'test') do d.feed({'x1' => 1}) d.feed({'x1' => 2}) d.feed({'x1' => 3}) end r = d.instance.flush assert_equal 1, r['test_min'] assert_equal 3, r['test_max'] assert_equal 2, r['test_avg'] assert_equal 6, r['test_sum'] assert_equal 3, r['test_num'] end def test_output_per_tag d = create_driver(CONFIG + %[ aggregate tag output_per_tag true tag_prefix tag_prefix ]) time = Time.now.to_i d.run(default_tag: 'tag') do tag1 = 'tag1' d.feed(tag1, time, {'field1' => 1}) d.feed(tag1, time, {'field1' => 2}) d.feed(tag1, time, {'field1' => 3}) tag2 = 'tag2' d.feed(tag2, time, {'field1' => 1}) d.feed(tag2, time, {'field1' => 2}) d.feed(tag2, time, {'field1' => 3}) d.instance.flush_emit end assert_equal 2, d.events.size tag, r = d.events[0][0], d.events[0][2] assert_equal 'tag_prefix.tag1', tag assert_equal 1, r['min'] assert_equal 3, r['max'] assert_equal 2, r['avg'] assert_equal 6, r['sum'] assert_equal 3, r['num'] tag, r = d.events[1][0], d.events[1][2] assert_equal 'tag_prefix.tag2', tag assert_equal 1, r['min'] assert_equal 3, r['max'] assert_equal 2, r['avg'] assert_equal 6, r['sum'] assert_equal 3, r['num'] d = create_driver(CONFIG + %[ aggregate tag output_per_tag false tag output_tag ]) time = Time.now.to_i d.run(default_tag: 'tag') do tag1 = 'tag1' d.feed(tag1, time, {'field1' => 1}) d.feed(tag1, time, {'field1' => 2}) d.feed(tag1, time, {'field1' => 3}) tag2 = 'tag2' d.feed(tag2, time, {'field1' => 1}) d.feed(tag2, time, {'field1' => 2}) d.feed(tag2, time, {'field1' => 3}) d.instance.flush_emit end assert_equal 1, d.events.size tag, r = d.events[0][0], d.events[0][2] assert_equal 'output_tag', tag assert_equal 1, r['tag1_min'] assert_equal 3, r['tag1_max'] assert_equal 2, r['tag1_avg'] assert_equal 6, r['tag1_sum'] assert_equal 3, r['tag1_num'] assert_equal 1, r['tag2_min'] assert_equal 3, r['tag2_max'] assert_equal 2, r['tag2_avg'] assert_equal 6, r['tag1_sum'] assert_equal 3, r['tag2_num'] d = create_driver(CONFIG + %[ aggregate all output_per_tag true tag_prefix tag_prefix ]) time = Time.now.to_i d.run(default_tag: 'tag') do tag1 = 'tag1' d.feed(tag1, time, {'field1' => 1}) d.feed(tag1, time, {'field1' => 2}) d.feed(tag1, time, {'field1' => 3}) tag2 = 'tag2' d.feed(tag2, time, {'field1' => 1}) d.feed(tag2, time, {'field1' => 2}) d.feed(tag2, time, {'field1' => 3}) d.instance.flush_emit end assert_equal 1, d.events.size tag = d.events[0][0] r = d.events[0][2] assert_equal 'tag_prefix.all', tag assert_equal 1, r['min'] assert_equal 3, r['max'] assert_equal 2, r['avg'] assert_equal 12, r['sum'] assert_equal 6, r['num'] d = create_driver(CONFIG + %[ aggregate all output_per_tag false tag output_tag ]) time = Time.now.to_i d.run(default_tag: 'tag') do tag1 = 'tag1' d.feed(tag1, time, {'field1' => 1}) d.feed(tag1, time, {'field1' => 2}) d.feed(tag1, time, {'field1' => 3}) tag2 = 'tag2' d.feed(tag2, time, {'field1' => 1}) d.feed(tag2, time, {'field1' => 2}) d.feed(tag2, time, {'field1' => 3}) d.instance.flush_emit end assert_equal 1, d.events.size tag = d.events[0][0] r = d.events[0][2] assert_equal 'output_tag', tag assert_equal 1, r['min'] assert_equal 3, r['max'] assert_equal 2, r['avg'] assert_equal 12, r['sum'] assert_equal 6, r['num'] end def test_output_key_prefix d = create_driver(CONFIG + %[ aggregate tag output_per_tag true tag_prefix tag_prefix output_key_prefix key_prefix ]) time = Time.now.to_i d.run(default_tag: 'tag') do tag1 = 'tag1' d.feed(tag1, time, {'field1' => 1}) d.feed(tag1, time, {'field1' => 2}) d.feed(tag1, time, {'field1' => 3}) tag2 = 'tag2' d.feed(tag2, time, {'field1' => 1}) d.feed(tag2, time, {'field1' => 2}) d.feed(tag2, time, {'field1' => 3}) d.instance.flush_emit end assert_equal 2, d.events.size tag, r = d.events[0][0], d.events[0][2] assert_equal 'tag_prefix.tag1', tag assert_equal 1, r['key_prefix_min'] assert_equal 3, r['key_prefix_max'] assert_equal 2, r['key_prefix_avg'] assert_equal 6, r['key_prefix_sum'] assert_equal 3, r['key_prefix_num'] tag, r = d.events[1][0], d.events[1][2] assert_equal 'tag_prefix.tag2', tag assert_equal 1, r['key_prefix_min'] assert_equal 3, r['key_prefix_max'] assert_equal 2, r['key_prefix_avg'] assert_equal 6, r['key_prefix_sum'] assert_equal 3, r['key_prefix_num'] d = create_driver(CONFIG + %[ aggregate tag output_per_tag false tag output_tag output_key_prefix key_prefix ]) time = Time.now.to_i d.run(default_tag: 'tag') do tag1 = 'tag1' d.feed(tag1, time, {'field1' => 1}) d.feed(tag1, time, {'field1' => 2}) d.feed(tag1, time, {'field1' => 3}) tag2 = 'tag2' d.feed(tag2, time, {'field1' => 1}) d.feed(tag2, time, {'field1' => 2}) d.feed(tag2, time, {'field1' => 3}) d.instance.flush_emit end assert_equal 1, d.events.size tag, r = d.events[0][0], d.events[0][2] assert_equal 'output_tag', tag assert_equal 1, r['key_prefix_tag1_min'] assert_equal 3, r['key_prefix_tag1_max'] assert_equal 2, r['key_prefix_tag1_avg'] assert_equal 6, r['key_prefix_tag1_sum'] assert_equal 3, r['key_prefix_tag1_num'] assert_equal 1, r['key_prefix_tag2_min'] assert_equal 3, r['key_prefix_tag2_max'] assert_equal 2, r['key_prefix_tag2_avg'] assert_equal 6, r['key_prefix_tag2_sum'] assert_equal 3, r['key_prefix_tag2_num'] d = create_driver(CONFIG + %[ aggregate all output_per_tag true tag_prefix tag_prefix output_key_prefix key_prefix ]) time = Time.now.to_i d.run(default_tag: 'tag1') do tag1 = 'tag1' d.feed(tag1, time, {'field1' => 1}) d.feed(tag1, time, {'field1' => 2}) d.feed(tag1, time, {'field1' => 3}) tag2 = 'tag2' d.feed(tag2, time, {'field1' => 1}) d.feed(tag2, time, {'field1' => 2}) d.feed(tag2, time, {'field1' => 3}) d.instance.flush_emit end assert_equal 1, d.events.size tag = d.events[0][0] r = d.events[0][2] assert_equal 'tag_prefix.all', tag assert_equal 1, r['key_prefix_min'] assert_equal 3, r['key_prefix_max'] assert_equal 2, r['key_prefix_avg'] assert_equal 12, r['key_prefix_sum'] assert_equal 6, r['key_prefix_num'] d = create_driver(CONFIG + %[ aggregate all output_per_tag false tag output_tag output_key_prefix key_prefix ]) time = Time.now.to_i d.run(default_tag: 'tag') do tag1 = 'tag1' d.feed(tag1, time, {'field1' => 1}) d.feed(tag1, time, {'field1' => 2}) d.feed(tag1, time, {'field1' => 3}) tag2 = 'tag2' d.feed(tag2, time, {'field1' => 1}) d.feed(tag2, time, {'field1' => 2}) d.feed(tag2, time, {'field1' => 3}) d.instance.flush_emit end assert_equal 1, d.events.size tag = d.events[0][0] r = d.events[0][2] assert_equal 'output_tag', tag assert_equal 1, r['key_prefix_min'] assert_equal 3, r['key_prefix_max'] assert_equal 2, r['key_prefix_avg'] assert_equal 12, r['key_prefix_sum'] assert_equal 6, r['key_prefix_num'] end end