# -*- coding: utf-8 -*-
require 'helper'
require 'redis'
class QueryCombinerOutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
end
CONFIG = %[
query_identify event_id
condition status == 'start'
replace time => time_start
condition status == 'finish'
replace time => time_finish
]
@redis
def setup
Fluent::Test.setup
@redis = Redis.new(:host => 'localhost', :port => 6379, :thread_safe => true, :db => 0)
end
def teardown
@redis.quit
end
def create_driver(conf, tag='test')
Fluent::Test::BufferedOutputTestDriver.new(Fluent::QueryCombinerOutput, tag).configure(conf)
end
def test_configure
assert_raise(Fluent::ConfigError) {
d = create_driver('')
}
# Must have and conditions
assert_raise(Fluent::ConfigError) {
d = create_driver %[
query_identify event_id
]
}
assert_raise(Fluent::ConfigError) {
d = create_driver %[
query_identify event_id
condition status == 'start'
]
}
assert_raise(Fluent::ConfigError) {
d = create_driver %[
query_identify event_id
condition status == 'finish'
]
}
# `replace` configuration only allowed in and
assert_raise(Fluent::ConfigError) {
d = create_driver %[
query_identify event_id
condition status == 'start'
replace hoge => hoge_start
condition status == 'error'
replace hoge => hoge_error
condition status == 'finish'
replace hoge => hoge_finish
]
}
end
def test_readme_sample_basic_example
d = create_driver %[
query_identify event_id
query_ttl 3600 # time to live[sec]
buffer_size 1000 # queries
condition status == 'event-start'
condition status == 'event-finish'
]
time = Time.now.to_i
d.emit({"event_id"=>"01234567", "status"=>"event-start", "started_at"=>"2001-02-03T04:05:06Z"}, time)
d.emit({"event_id"=>"01234567", "status"=>"event-finish", "finished_at"=>"2001-02-03T04:05:06Z"}, time)
d.run
assert_equal d.emits.length, 1
assert_equal d.emits[0][2], {
"event_id"=>"01234567",
"status"=>"event-finish",
"started_at"=>"2001-02-03T04:05:06Z",
"finished_at"=>"2001-02-03T04:05:06Z"}
end
def test_readme_sample_replace_sentence
d = create_driver %[
query_identify event_id
query_ttl 3600 # time to live[sec]
buffer_size 1000 # queries
condition status == 'event-start'
replace time => time_start
condition status == 'event-finish'
replace time => time_finish
]
time = Time.now.to_i
d.emit({"event_id"=>"01234567", "status"=>"event-start", "time"=>"2001-02-03T04:05:06Z"}, time)
d.emit({"event_id"=>"01234567", "status"=>"event-finish", "time"=>"2001-02-03T04:15:11Z"}, time)
d.run
assert_equal d.emits.length, 1
assert_equal d.emits[0][2], {
"event_id"=>"01234567",
"status"=>"event-finish",
"time_start"=>"2001-02-03T04:05:06Z",
"time_finish"=>"2001-02-03T04:15:11Z"}
end
def test_readme_sample_release
d = create_driver %[
query_identify event_id
query_ttl 3600 # time to live[sec]
buffer_size 1000 # queries
condition status == 'event-start'
condition status == 'event-finish'
condition status == 'event-error'
]
time = Time.now.to_i
d.emit({"event_id"=>"01234567", "status"=>"event-start", "time"=>"2001-02-03T04:05:06Z"}, time)
d.emit({"event_id"=>"01234567", "status"=>"event-error", "time"=>"2001-02-03T04:05:06Z"}, time)
d.run
assert_equal d.emits.length, 0
end
def test_readme_sample_prolong
d = create_driver %[
query_identify event_id
query_ttl 3600 # time to live[sec]
buffer_size 1000 # queries
condition status == 'event-start'
condition status == 'event-finish'
condition status == 'event-continue'
condition status == 'event-error'
]
time = Time.now.to_i
d.emit({"event_id"=>"01234567", "status"=>"event-start", "time"=>"2001-02-03T04:05:06Z"}, time)
d.emit({"event_id"=>"01234567", "status"=>"event-continue", "time"=>"2001-02-03T04:05:07Z"}, time)
d.emit({"event_id"=>"01234567", "status"=>"event-continue", "time"=>"2001-02-03T04:05:08Z"}, time)
d.emit({"event_id"=>"01234567", "status"=>"event-continue", "time"=>"2001-02-03T04:05:09Z"}, time)
d.emit({"event_id"=>"01234567", "status"=>"event-continue", "time"=>"2001-02-03T04:05:10Z"}, time)
d.emit({"event_id"=>"01234567", "status"=>"event-finish", "time"=>"2001-02-03T04:05:11Z"}, time)
d.run
assert_equal d.emits.length, 1
assert_equal d.emits[0][2], {
"event_id"=>"01234567",
"status"=>"event-finish",
"time"=>"2001-02-03T04:05:11Z"}
end
def test_simple_events
d = create_driver CONFIG
time = Time.now.to_i
d.emit({"event_id"=>"001", "status"=>"start", "time"=>"21:00"}, time)
d.emit({"event_id"=>"002", "status"=>"start", "time"=>"22:00"}, time)
d.emit({"event_id"=>"001", "status"=>"finish", "time"=>"23:00"}, time)
d.emit({"event_id"=>"002", "status"=>"finish", "time"=>"24:00"}, time)
d.run
assert_equal d.emits[0][2], {
"event_id"=>"001",
"status"=>"finish",
"time_start"=>"21:00",
"time_finish"=>"23:00"}
assert_equal d.emits[1][2], {
"event_id"=>"002",
"status"=>"finish",
"time_start"=>"22:00",
"time_finish"=>"24:00"}
end
def test_catch_dump_release
d = create_driver %[
buffer_size 1001
query_identify event_id
condition status == 'start'
replace time => time_start
condition status == 'finish'
replace time => time_finish
condition status == 'error'
]
def emit(d, event_id, status, t)
d.emit({"event_id"=>event_id, "status"=>status, "time"=>t}, Time.now.to_i)
end
(0..1000).each { |num|
emit(d, num, "start", "21:00")
}
finish_list = []
(0..1000).each { |num|
status = if rand >= 0.5 then
finish_list.push(num)
"finish"
else
"error"
end
emit(d, num, status, "22:00")
}
d.run
finish_list.each_with_index { |num, index|
assert_equal d.emits[index][2], {
"event_id" => num,
"status" => "finish",
"time_start" => "21:00",
"time_finish" => "22:00",
}
}
assert_equal d.emits.size, finish_list.size
end
def test_multi_query_identifier
d = create_driver %[
buffer_size 1001
query_identify aid, bid, cid
condition status == 'start'
condition status == 'finish'
]
def emit(d, aid, bid, cid, status, t)
d.emit(
{"aid"=>aid, "bid"=>bid, "cid"=>cid, "status"=>status, "time"=>t},
Time.now.to_i
)
end
finish_list = []
(0..1000).each { |num|
aid = (rand * 1000).to_i
bid = (rand * 1000).to_i
cid = (rand * 1000).to_i
emit(d, aid, bid, cid, "start", "22:00")
finish_list.push([aid, bid, cid])
}
t_list = []
finish_list.each { |ids|
t = (rand * 100000).to_i
emit(d, ids[0], ids[1], ids[2], "finish", t)
t_list.push(t)
}
d.run
finish_list.each_with_index { |ids, index|
assert_equal d.emits[index][2], {
"aid" => ids[0],
"bid" => ids[1],
"cid" => ids[2],
"status" => "finish",
"time" => t_list[index]
}
}
assert_equal d.emits.size, finish_list.size
end
def test_buffer_size
end
def test_query_ttl
end
end