# -*- coding: utf-8 -*- require 'helper' require 'redis' class QueryCombinerOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup end CONFIG = %[ query_identify event_id condition status == 'start' replace time => time_start condition status == 'finish' replace time => time_finish ] @redis def setup Fluent::Test.setup @redis = Redis.new(:host => 'localhost', :port => 6379, :thread_safe => true, :db => 0) end def teardown @redis.quit end def tryOnRedis(method, *args) tries = 0 begin @redis.send(method, *args) if @redis.respond_to? method rescue Redis::CommandError => e tries += 1 # retry 3 times retry if tries <= @redis_retry $log.warn %Q[redis command retry failed : #{method}(#{args.join(', ')})] raise e.message end end def create_driver(conf, tag='test') Fluent::Test::BufferedOutputTestDriver.new(Fluent::QueryCombinerOutput, tag).configure(conf) end def test_configure assert_raise(Fluent::ConfigError) { d = create_driver('') } # Must have and conditions assert_raise(Fluent::ConfigError) { d = create_driver %[ query_identify event_id ] } assert_raise(Fluent::ConfigError) { d = create_driver %[ query_identify event_id condition status == 'start' ] } assert_raise(Fluent::ConfigError) { d = create_driver %[ query_identify event_id condition status == 'finish' ] } # `replace` configuration only allowed in and assert_raise(Fluent::ConfigError) { d = create_driver %[ query_identify event_id condition status == 'start' replace hoge => hoge_start condition status == 'error' replace hoge => hoge_error condition status == 'finish' replace hoge => hoge_finish ] } # `time` configuration only allowed in and assert_raise(Fluent::ConfigError) { d = create_driver %[ query_identify event_id condition status == 'start' replace hoge => hoge_start time time_catch condition status == 'error' time time_release condition status == 'finish' replace hoge => hoge_finish time time_dump ] } end def test_readme_sample_basic_example d = create_driver %[ query_identify event_id query_ttl 3600 # time to live[sec] buffer_size 1000 # queries condition status == 'event-start' condition status == 'event-finish' ] time = Time.now.to_i d.emit({"event_id"=>"01234567", "status"=>"event-start", "started_at"=>"2001-02-03T04:05:06Z"}, time) d.emit({"event_id"=>"01234567", "status"=>"event-finish", "finished_at"=>"2001-02-03T04:05:06Z"}, time) d.run assert_equal d.emits.length, 1 assert_equal d.emits[0][2], { "event_id"=>"01234567", "status"=>"event-finish", "started_at"=>"2001-02-03T04:05:06Z", "finished_at"=>"2001-02-03T04:05:06Z"} end def test_readme_sample_replace_sentence d = create_driver %[ query_identify event_id query_ttl 3600 # time to live[sec] buffer_size 1000 # queries condition status == 'event-start' replace time => time_start condition status == 'event-finish' replace time => time_finish ] time = Time.now.to_i d.emit({"event_id"=>"01234567", "status"=>"event-start", "time"=>"2001-02-03T04:05:06Z"}, time) d.emit({"event_id"=>"01234567", "status"=>"event-finish", "time"=>"2001-02-03T04:15:11Z"}, time) d.run assert_equal d.emits.length, 1 assert_equal d.emits[0][2], { "event_id"=>"01234567", "status"=>"event-finish", "time_start"=>"2001-02-03T04:05:06Z", "time_finish"=>"2001-02-03T04:15:11Z"} end def test_readme_sample_replace_multiple_fields d = create_driver %[ query_identify event_id query_ttl 3600 # time to live[sec] buffer_size 1000 # queries condition status == 'event-start' replace time => time_start, condition => condition_start condition status == 'event-finish' replace time => time_finish, condition => condition_end ] time = Time.now.to_i d.emit({"event_id"=>"01234567", "status"=>"event-start", "time"=>"2001-02-03T04:05:06Z", "condition"=>"bad"}, time) d.emit({"event_id"=>"01234567", "status"=>"event-finish", "time"=>"2001-02-03T04:15:11Z", "condition"=>"excellent"}, time) d.run assert_equal d.emits.length, 1 assert_equal d.emits[0][2], { "event_id"=>"01234567", "status"=>"event-finish", "time_start"=>"2001-02-03T04:05:06Z", "condition_start"=>"bad", "time_finish"=>"2001-02-03T04:15:11Z", "condition_end"=>"excellent"} end def test_readme_sample_release d = create_driver %[ query_identify event_id query_ttl 3600 # time to live[sec] buffer_size 1000 # queries condition status == 'event-start' condition status == 'event-finish' condition status == 'event-error' ] time = Time.now.to_i d.emit({"event_id"=>"01234567", "status"=>"event-start", "time"=>"2001-02-03T04:05:06Z"}, time) d.emit({"event_id"=>"01234567", "status"=>"event-error", "time"=>"2001-02-03T04:05:06Z"}, time) d.run assert_equal d.emits.length, 0 end def test_readme_sample_prolong d = create_driver %[ query_identify event_id query_ttl 3600 # time to live[sec] buffer_size 1000 # queries condition status == 'event-start' condition status == 'event-finish' condition status == 'event-continue' condition status == 'event-error' ] time = Time.now.to_i d.emit({"event_id"=>"01234567", "status"=>"event-start", "time"=>"2001-02-03T04:05:06Z"}, time) d.emit({"event_id"=>"01234567", "status"=>"event-continue", "time"=>"2001-02-03T04:05:07Z"}, time) d.emit({"event_id"=>"01234567", "status"=>"event-continue", "time"=>"2001-02-03T04:05:08Z"}, time) d.emit({"event_id"=>"01234567", "status"=>"event-continue", "time"=>"2001-02-03T04:05:09Z"}, time) d.emit({"event_id"=>"01234567", "status"=>"event-continue", "time"=>"2001-02-03T04:05:10Z"}, time) d.emit({"event_id"=>"01234567", "status"=>"event-finish", "time"=>"2001-02-03T04:05:11Z"}, time) d.run assert_equal d.emits.length, 1 assert_equal d.emits[0][2], { "event_id"=>"01234567", "status"=>"event-finish", "time"=>"2001-02-03T04:05:11Z"} end def test_simple_events d = create_driver CONFIG time = Time.now.to_i d.emit({"event_id"=>"001", "status"=>"start", "time"=>"21:00"}, time) d.emit({"event_id"=>"002", "status"=>"start", "time"=>"22:00"}, time) d.emit({"event_id"=>"001", "status"=>"finish", "time"=>"23:00"}, time) d.emit({"event_id"=>"002", "status"=>"finish", "time"=>"24:00"}, time) d.run assert_equal d.emits[0][2], { "event_id"=>"001", "status"=>"finish", "time_start"=>"21:00", "time_finish"=>"23:00"} assert_equal d.emits[1][2], { "event_id"=>"002", "status"=>"finish", "time_start"=>"22:00", "time_finish"=>"24:00"} end def test_catch_dump_release d = create_driver %[ buffer_size 1001 query_identify event_id condition status == 'start' replace time => time_start condition status == 'finish' replace time => time_finish condition status == 'error' ] def emit(d, event_id, status, t) d.emit({"event_id"=>event_id, "status"=>status, "time"=>t}, Time.now.to_i) end (0..1000).each { |num| emit(d, num, "start", "21:00") } finish_list = [] (0..1000).each { |num| status = if rand >= 0.5 then finish_list.push(num) "finish" else "error" end emit(d, num, status, "22:00") } d.run finish_list.each_with_index { |num, index| assert_equal d.emits[index][2], { "event_id" => num, "status" => "finish", "time_start" => "21:00", "time_finish" => "22:00", } } assert_equal d.emits.size, finish_list.size end def test_multi_query_identifier d = create_driver %[ buffer_size 1001 query_identify aid, bid, cid condition status == 'start' condition status == 'finish' ] def emit(d, aid, bid, cid, status, t) d.emit( {"aid"=>aid, "bid"=>bid, "cid"=>cid, "status"=>status, "time"=>t}, Time.now.to_i ) end finish_list = [] (0..1000).each { |num| aid = (rand * 1000).to_i bid = (rand * 1000).to_i cid = (rand * 1000).to_i emit(d, aid, bid, cid, "start", "22:00") finish_list.push([aid, bid, cid]) } t_list = [] finish_list.each { |ids| t = (rand * 100000).to_i emit(d, ids[0], ids[1], ids[2], "finish", t) t_list.push(t) } d.run finish_list.each_with_index { |ids, index| assert_equal d.emits[index][2], { "aid" => ids[0], "bid" => ids[1], "cid" => ids[2], "status" => "finish", "time" => t_list[index] } } assert_equal d.emits.size, finish_list.size end def test_time_format_and_configuration d = create_driver %[ query_identify event_id query_ttl 3600 # time to live[sec] buffer_size 1000 # queries time_format Time.at($time).iso8601(3) condition status == 'event-start' time time-catch condition status == 'event-finish' time time-dump condition status == 'event-continue' condition status == 'event-error' ] def emit(d, status) d.emit({"event_id"=>"01234567", "status"=>status}, Time.now.to_f) end emit(d, "event-start") emit(d, "event-continue") emit(d, "event-continue") emit(d, "event-continue") emit(d, "event-finish") d.run assert_equal d.emits.length, 1 assert_not_nil d.emits[0][2]['time-catch'] assert_not_nil d.emits[0][2]['time-dump'] end def test_query_ttl end def test_buffer_size end def test_continuous_dump d = create_driver %[ redis_key_prefix test_query_combiner: query_identify event_id query_ttl 5 # time to live[sec] buffer_size 1000 # queries continuous_dump true remove_interval 5 condition status == 'event-start' condition status == 'event-finish' condition status == 'event-error' ] assert_equal false, (tryOnRedis 'exists', "test_query_combiner:id000") d.emit({"event_id"=>"id000", "status"=>"event-start", "key_init"=>"init"}, Time.now.to_f) d.emit({"event_id"=>"id000", "status"=>"event-finish"}, Time.now.to_f) d.emit({"event_id"=>"id000", "status"=>"event-finish", "key_fin1"=>"fin1"}, Time.now.to_f) d.emit({"event_id"=>"id000", "status"=>"event-finish", "key_fin2"=>"fin2"}, Time.now.to_f) d.run assert_equal d.emits.length, 3 assert_equal d.emits[0][2], {"event_id"=>"id000", "status"=>"event-finish", "key_init"=>"init"} assert_equal d.emits[1][2], {"event_id"=>"id000", "status"=>"event-finish", "key_init"=>"init", "key_fin1"=>"fin1"} assert_equal d.emits[2][2], {"event_id"=>"id000", "status"=>"event-finish", "key_init"=>"init", "key_fin2"=>"fin2"} assert_equal true, (tryOnRedis 'exists', "test_query_combiner:id000") sleep 6 assert_equal false, (tryOnRedis 'exists', "test_query_combiner:id000") end end