require_relative 'helper'
require 'rr'
require 'fluent/plugin/in_cat_sweep'
class CatSweepInputTest < Test::Unit::TestCase
include Fluent::Test::Helpers
def setup
Fluent::Test.setup
FileUtils.mkdir_p(TMP_DIR_FROM)
FileUtils.mkdir_p(TMP_DIR_TO)
end
def teardown
FileUtils.rm_r(TMP_DIR_FROM)
FileUtils.rm_r(TMP_DIR_TO)
end
TMP_DIR_FROM = '/tmp/fluent_plugin_test_in_cat_sweep_from'
TMP_DIR_TO = '/tmp/fluent_plugin_test_in_cat_sweep_to'
CONFIG_BASE = %[
file_path_with_glob #{TMP_DIR_FROM}/*
run_interval 0.05
]
CONFIG_MINIMUM_REQUIRED =
CONFIG_BASE + %[
@type tsv
keys ""
waiting_seconds 3
]
CONFIG_MINIMUM_REQUIRED_IN_OLD_STYLE =
CONFIG_BASE + %[
format tsv
keys ""
waiting_seconds 4
]
def create_driver(conf)
Fluent::Test::Driver::Input.new(Fluent::Plugin::CatSweepInput).configure(conf)
end
def test_required_configure
assert_raise(Fluent::ConfigError) do
create_driver(%[])
end
assert_raise(Fluent::ConfigError) do
create_driver(CONFIG_BASE)
end
assert_raise(Fluent::ConfigError) do
create_driver(CONFIG_BASE + %[
@type tsv
keys ""
])
end
d = create_driver(CONFIG_MINIMUM_REQUIRED)
assert_equal "#{TMP_DIR_FROM}/*", d.instance.instance_variable_get(:@file_path_with_glob)
assert_equal Fluent::Plugin::TSVParser, d.instance.instance_variable_get(:@parser).class
assert_equal 3, d.instance.instance_variable_get(:@waiting_seconds)
d = create_driver(CONFIG_MINIMUM_REQUIRED_IN_OLD_STYLE)
assert_equal "#{TMP_DIR_FROM}/*", d.instance.instance_variable_get(:@file_path_with_glob)
assert_equal Fluent::Plugin::TSVParser, d.instance.instance_variable_get(:@parser).class
assert_equal 4, d.instance.instance_variable_get(:@waiting_seconds)
end
def test_configure_file_event_stream
d = create_driver(CONFIG_MINIMUM_REQUIRED)
assert { false == d.instance.file_event_stream }
d = create_driver(CONFIG_MINIMUM_REQUIRED + %[file_event_stream true])
assert { true == d.instance.file_event_stream }
end
def compare_test_result(events, tests)
events.each_index do |i|
assert { tests[i]['expected'] == events[i][2]['message'] }
end
end
TEST_CASES =
{
'none' => [
{'msg' => "tcptest1\n", 'expected' => 'tcptest1'},
{'msg' => "tcptest2\n", 'expected' => 'tcptest2'},
],
'tsv' => [
{'msg' => "t.e.s.t.1\t12345\ttcptest1\t{\"json\":1}\n", 'expected' => '{"json":1}'},
{'msg' => "t.e.s.t.2\t54321\ttcptest2\t{\"json\":\"char\"}\n", 'expected' => '{"json":"char"}'},
],
'json' => [
{'msg' => {'k' => 123, 'message' => 'tcptest1'}.to_json + "\n", 'expected' => 'tcptest1'},
{'msg' => {'k' => 'tcptest2', 'message' => 456}.to_json + "\n", 'expected' => 456},
]
}
[false, true].each do |file_event_stream|
TEST_CASES.each do |format, test_cases|
test_case_name = "test_msg_process_#{format}_file_event_stream_#{file_event_stream}"
define_method(test_case_name) do
File.open("#{TMP_DIR_FROM}/#{test_case_name}", 'w') do |io|
test_cases.each do |test|
io.write(test['msg'])
end
end
d = create_driver(CONFIG_BASE + %[
format #{format}
file_event_stream #{file_event_stream}
waiting_seconds 0
keys hdfs_path,unixtimestamp,label,message
])
d.run(timeout: 1, expect_records: 2) # timeout = waiting_seconds + 1
compare_test_result(d.events, test_cases)
assert { Dir.glob("#{TMP_DIR_FROM}/#{test_case_name}*").empty? }
end
end
end
def test_move_file
format = 'tsv'
test_cases =
[
{'msg' => "t.e.s.t.1\t12345\ttcptest1\t{\"json\":1}\n", 'expected' => '{"json":1}'},
{'msg' => "t.e.s.t.2\t54321\ttcptest2\t{\"json\":\"char\"}\n", 'expected' => '{"json":"char"}'},
]
File.open("#{TMP_DIR_FROM}/test_move_file", 'w') do |io|
test_cases.each do |test|
io.write(test['msg'])
end
end
d = create_driver(CONFIG_BASE + %[
format #{format}
waiting_seconds 0
keys hdfs_path,unixtimestamp,label,message
move_to #{TMP_DIR_TO}
])
d.run(timeout: 1, expect_records: 2) # timeout = waiting_seconds + 1
compare_test_result(d.events, test_cases)
assert(Dir.glob("#{TMP_DIR_FROM}/test_move_file*").empty?)
assert_match(
%r{\A#{TMP_DIR_TO}#{TMP_DIR_FROM}/test_move_file},
Dir.glob("#{TMP_DIR_TO}#{TMP_DIR_FROM}/test_move_file*").first)
assert_equal(
test_cases.map{|t|t['msg']}.join.to_s,
File.read(Dir.glob("#{TMP_DIR_TO}#{TMP_DIR_FROM}/test_move_file*").first))
end
def test_oneline_max_bytes
format = 'tsv'
test_cases =
[
{'msg' => "t.e.s.t.1\t12345\ttcptest1\t{\"json\":1}\n", 'expected' => '{"json":1}'},
{'msg' => "t.e.s.t.2\t54321\ttcptest2\t{\"json\":\"char\"}\n", 'expected' => '{"json":"char"}'},
]
File.open("#{TMP_DIR_FROM}/test_oneline_max_bytes", 'w') do |io|
test_cases.each do |test|
io.write(test['msg'])
end
end
d = create_driver(CONFIG_BASE + %[
format #{format}
waiting_seconds 0
keys hdfs_path,unixtimestamp,label,message
move_to #{TMP_DIR_TO}
oneline_max_bytes 1
])
d.run(timeout: 1, expect_records: 2) # timeout = waiting_seconds + 1
assert_match(
%r{\A#{TMP_DIR_FROM}/test_oneline_max_bytes.*\.error},
Dir.glob("#{TMP_DIR_FROM}/test_oneline_max_bytes*").first)
assert_equal(
test_cases.map{|t|t['msg']}.join.to_s,
File.read(Dir.glob("#{TMP_DIR_FROM}/test_oneline_max_bytes*.error").first))
end
end