require_relative '../helper'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_exec'
require 'fileutils'
class ExecOutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
FileUtils.rm_rf(TMP_DIR, secure: true)
if File.exist?(TMP_DIR)
# ensure files are closed for Windows, on which deleted files
# are still visible from filesystem
GC.start(full_mark: true, immediate_sweep: true)
FileUtils.remove_entry_secure(TMP_DIR)
end
FileUtils.mkdir_p(TMP_DIR)
end
TMP_DIR = File.dirname(__FILE__) + "/../tmp/out_exec#{ENV['TEST_ENV_NUMBER']}"
def create_driver(config)
Fluent::Test::Driver::Output.new(Fluent::Plugin::ExecOutput).configure(config)
end
def create_test_data
time = event_time("2011-01-02 13:14:15.123")
records = [{"k1"=>"v1","kx"=>"vx"}, {"k1"=>"v2","kx"=>"vx"}]
return time, records
end
DEFAULT_CONFIG_ONLY_WITH_KEYS = %[
command cat >#{TMP_DIR}/out
keys ["k1", "kx"]
]
test 'configure in default' do
d = create_driver DEFAULT_CONFIG_ONLY_WITH_KEYS
assert{ d.instance.formatter.is_a? Fluent::Plugin::TSVFormatter }
assert_equal ["k1", "kx"], d.instance.formatter.keys
assert_nil d.instance.inject_config
end
TSV_CONFIG = %[
command cat >#{TMP_DIR}/out
tag_key tag
time_key time
time_format %Y-%m-%d %H:%M:%S
localtime yes
@type tsv
keys time, tag, k1
]
TSV_CONFIG_WITH_SUBSEC = %[
command cat >#{TMP_DIR}/out
tag_key tag
time_key time
time_format %Y-%m-%d %H:%M:%S.%3N
localtime yes
@type tsv
keys time, tag, k1
]
TSV_CONFIG_WITH_BUFFER = TSV_CONFIG + %[
@type memory
timekey 3600
flush_thread_count 5
chunk_limit_size 50m
total_limit_size #{50 * 1024 * 1024 * 128}
flush_at_shutdown yes
]
JSON_CONFIG = %[
command cat >#{TMP_DIR}/out
@type json
]
MSGPACK_CONFIG = %[
command cat >#{TMP_DIR}/out
@type msgpack
]
CONFIG_COMPAT = %[
buffer_path #{TMP_DIR}/buffer
command cat >#{TMP_DIR}/out
localtime
]
TSV_CONFIG_COMPAT = %[
keys "time,tag,k1"
tag_key "tag"
time_key "time"
time_format %Y-%m-%d %H:%M:%S
]
BUFFER_CONFIG_COMPAT = %[
buffer_type memory
time_slice_format %Y%m%d%H
num_threads 5
buffer_chunk_limit 50m
buffer_queue_limit 128
flush_at_shutdown yes
]
TSV_CONFIG_WITH_SUBSEC_COMPAT = %[
keys "time,tag,k1"
tag_key "tag"
time_key "time"
time_format %Y-%m-%d %H:%M:%S.%3N
]
data(
'with sections' => TSV_CONFIG,
'traditional' => CONFIG_COMPAT + TSV_CONFIG_COMPAT,
)
test 'configure for tsv' do |conf|
d = create_driver(conf)
assert_equal ["time","tag","k1"], d.instance.formatter.keys
assert_equal "tag", d.instance.inject_config.tag_key
assert_equal "time", d.instance.inject_config.time_key
assert_equal "%Y-%m-%d %H:%M:%S", d.instance.inject_config.time_format
assert_equal true, d.instance.inject_config.localtime
end
data(
'with sections' => TSV_CONFIG_WITH_BUFFER,
'traditional' => CONFIG_COMPAT + TSV_CONFIG_COMPAT + BUFFER_CONFIG_COMPAT,
)
test 'configure_with_compat_buffer_parameters' do |conf|
d = create_driver(conf)
assert_equal 3600, d.instance.buffer_config.timekey
assert_equal 5, d.instance.buffer_config.flush_thread_count
assert_equal 50*1024*1024, d.instance.buffer.chunk_limit_size
assert_equal 50*1024*1024*128, d.instance.buffer.total_limit_size
assert d.instance.buffer_config.flush_at_shutdown
end
data(
'with sections' => TSV_CONFIG,
'traditional' => CONFIG_COMPAT + TSV_CONFIG_COMPAT,
)
test 'format' do |conf|
d = create_driver(conf)
time, records = create_test_data
d.run(default_tag: 'test') do
d.feed(time, records[0])
d.feed(time, records[1])
end
assert_equal %[2011-01-02 13:14:15\ttest\tv1\n], d.formatted[0]
assert_equal %[2011-01-02 13:14:15\ttest\tv2\n], d.formatted[1]
end
data(
'with sections' => JSON_CONFIG,
'traditional' => CONFIG_COMPAT + "format json",
)
test 'format_json' do |conf|
d = create_driver(conf)
time, records = create_test_data
d.run(default_tag: 'test') do
d.feed(time, records[0])
d.feed(time, records[1])
end
assert_equal Yajl.dump(records[0]) + "\n", d.formatted[0]
assert_equal Yajl.dump(records[1]) + "\n", d.formatted[1]
end
data(
'with sections' => MSGPACK_CONFIG,
'traditional' => CONFIG_COMPAT + "format msgpack"
)
test 'format_msgpack' do |conf|
d = create_driver(conf)
time, records = create_test_data
d.run(default_tag: 'test') do
d.feed(time, records[0])
d.feed(time, records[1])
end
assert_equal records[0].to_msgpack, d.formatted[0]
assert_equal records[1].to_msgpack, d.formatted[1]
end
data(
'with sections' => TSV_CONFIG_WITH_SUBSEC,
'traditional' => CONFIG_COMPAT + TSV_CONFIG_WITH_SUBSEC_COMPAT,
)
test 'format subsecond time' do |conf|
d = create_driver(conf)
time, records = create_test_data
d.run(default_tag: 'test') do
d.feed(time, records[0])
d.feed(time, records[1])
end
assert_equal %[2011-01-02 13:14:15.123\ttest\tv1\n], d.formatted[0]
assert_equal %[2011-01-02 13:14:15.123\ttest\tv2\n], d.formatted[1]
end
data(
'with sections' => TSV_CONFIG,
'traditional' => CONFIG_COMPAT + TSV_CONFIG_COMPAT,
)
test 'write' do |conf|
d = create_driver(conf)
time, records = create_test_data
d.run(default_tag: 'test', flush: true) do
d.feed(time, records[0])
d.feed(time, records[1])
end
expect_path = "#{TMP_DIR}/out"
waiting(10, plugin: d.instance) do
sleep(0.1) until File.exist?(expect_path)
end
assert_equal true, File.exist?(expect_path)
data = File.read(expect_path)
expect_data =
%[2011-01-02 13:14:15\ttest\tv1\n] +
%[2011-01-02 13:14:15\ttest\tv2\n]
assert_equal expect_data, data
end
sub_test_case 'when executed process dies unexpectedly' do
setup do
@gen_config = ->(num){ <#{TMP_DIR}/fail_out
tag_key tag
time_key time
time_format %Y-%m-%d %H:%M:%S
localtime yes
@type tsv
keys time, tag, k1
EOC
}
end
test 'flushed chunk will be committed after child process successfully exits' do
d = create_driver(@gen_config.call(0))
time, records = create_test_data
expect_path = "#{TMP_DIR}/fail_out"
d.end_if{ File.exist?(expect_path) }
d.run(default_tag: 'test', flush: true, wait_flush_completion: true, shutdown: false) do
d.feed(time, records[0])
d.feed(time, records[1])
end
assert{ File.exist?(expect_path) }
data = File.read(expect_path)
expect_data =
%[2011-01-02 13:14:15\ttest\tv1\n] +
%[2011-01-02 13:14:15\ttest\tv2\n]
assert_equal expect_data, data
assert{ d.instance.buffer.queue.empty? }
assert{ d.instance.dequeued_chunks.empty? }
ensure
d.instance_shutdown if d && d.instance
end
test 'flushed chunk will be taken back after child process unexpectedly exits' do
d = create_driver(@gen_config.call(3))
time, records = create_test_data
expect_path = "#{TMP_DIR}/fail_out"
d.end_if{ d.instance.log.out.logs.any?{|line| line.include?("command exits with error code") } }
d.run(default_tag: 'test', flush: true, wait_flush_completion: false, shutdown: false) do
d.feed(time, records[0])
d.feed(time, records[1])
end
assert{ d.instance.dequeued_chunks.empty? } # because it's already taken back
assert{ d.instance.buffer.queue.size == 1 }
logs = d.instance.log.out.logs
assert{ logs.any?{|line| line.include?("command exits with error code") && line.include?("status=3") } }
assert{ File.exist?(expect_path) && File.size(expect_path) == 0 }
ensure
d.instance_shutdown if d && d.instance
end
end
end