require_relative '../helper' require 'fluent/test/driver/output' require 'fluent/plugin/out_file' require 'fileutils' require 'time' require 'timecop' require 'zlib' require 'fluent/file_wrapper' class FileOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup FileUtils.rm_rf(TMP_DIR) FileUtils.mkdir_p(TMP_DIR) @default_newline = if Fluent.windows? "\r\n" else "\n" end end TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_file#{ENV['TEST_ENV_NUMBER']}") CONFIG = %[ path #{TMP_DIR}/out_file_test compress gz utc timekey_use_utc true ] def create_driver(conf = CONFIG, opts = {}) Fluent::Test::Driver::Output.new(Fluent::Plugin::FileOutput, opts: opts).configure(conf) end sub_test_case 'configuration' do test 'basic configuration' do d = create_driver %[ path test_path compress gz ] assert_equal 'test_path', d.instance.path assert_equal :gz, d.instance.compress assert_equal :gzip, d.instance.instance_eval{ @compress_method } end test 'using root_dir for buffer path' do system_conf_opts = {'root_dir' => File.join(TMP_DIR, 'testrootdir')} buf_conf = config_element('buffer', '', {'flush_interval' => '1s'}) conf = config_element('match', '**', {'@id' => 'myout', 'path' => 'test_path', 'append' => 'true'}, [buf_conf]) d = create_driver(conf, system_conf_opts) assert_equal 'test_path', d.instance.path assert d.instance.append assert d.instance.buffer.respond_to?(:path) # file buffer assert_equal 1, d.instance.buffer_config.flush_interval assert_equal File.join(TMP_DIR, 'testrootdir', 'worker0', 'myout'), d.instance.plugin_root_dir buffer_path_under_root_dir = File.join(TMP_DIR, 'testrootdir', 'worker0', 'myout', 'buffer', 'buffer.*.log') assert_equal buffer_path_under_root_dir, d.instance.buffer.path end test 'path should be writable' do assert_raise(Fluent::ConfigError.new("'path' parameter is required")) do create_driver "" end assert_nothing_raised do create_driver %[path #{TMP_DIR}/test_path] end assert_nothing_raised do FileUtils.mkdir_p("#{TMP_DIR}/test_dir") File.chmod(0777, "#{TMP_DIR}/test_dir") create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] end if Process.uid.nonzero? assert_raise(Fluent::ConfigError) do FileUtils.mkdir_p("#{TMP_DIR}/test_dir") File.chmod(0555, "#{TMP_DIR}/test_dir") create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] end end end test 'default timezone is localtime' do d = create_driver(%[path #{TMP_DIR}/out_file_test]) time = event_time("2011-01-02 13:14:15 UTC") with_timezone(Fluent.windows? ? 'NST-8' : 'Asia/Taipei') do d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) end end assert_equal 1, d.formatted.size assert_equal %[2011-01-02T21:14:15+08:00\ttest\t{"a":1}#{@default_newline}], d.formatted[0] end test 'no configuration error raised for basic configuration using "*" (v0.12 style)' do conf = config_element('match', '**', { 'path' => "#{TMP_DIR}/test_out.*.log", 'time_slice_format' => '%Y%m%d', }) assert_nothing_raised do create_driver(conf) end end if Process.uid.nonzero? test 'configuration error raised if specified directory via template is not writable' do Timecop.freeze(Time.parse("2016-10-04 21:33:27 UTC")) do conf = config_element('match', '**', { 'path' => "#{TMP_DIR}/prohibited/${tag}/file.%Y%m%d.log", }, [ config_element('buffer', 'time,tag', {'timekey' => 86400, 'timekey_zone' => '+0000'}) ]) FileUtils.mkdir_p("#{TMP_DIR}/prohibited") File.chmod(0555, "#{TMP_DIR}/prohibited") assert_raise Fluent::ConfigError.new("out_file: `#{TMP_DIR}/prohibited/a/file.20161004.log_**.log` is not writable") do create_driver(conf) end end end end test 'configuration using inject/format/buffer sections fully' do conf = config_element('match', '**', { 'path' => "#{TMP_DIR}/${tag}/${type}/conf_test.%Y%m%d.%H%M.log", 'add_path_suffix' => 'false', 'append' => "true", 'symlink_path' => "#{TMP_DIR}/conf_test.current.log", 'compress' => 'gzip', 'recompress' => 'true', }, [ config_element('inject', '', { 'hostname_key' => 'hostname', 'hostname' => 'testing.local', 'tag_key' => 'tag', 'time_key' => 'time', 'time_type' => 'string', 'time_format' => '%Y/%m/%d %H:%M:%S %z', 'timezone' => '+0900', }), config_element('format', '', { '@type' => 'out_file', 'include_tag' => 'true', 'include_time' => 'true', 'delimiter' => 'COMMA', 'time_type' => 'string', 'time_format' => '%Y-%m-%d %H:%M:%S %z', 'utc' => 'true', }), config_element('buffer', 'time,tag,type', { '@type' => 'file', 'timekey' => '15m', 'timekey_wait' => '5s', 'timekey_zone' => '+0000', 'path' => "#{TMP_DIR}/buf_conf_test", 'chunk_limit_size' => '50m', 'total_limit_size' => '1g', 'compress' => 'gzip', }), ]) assert_nothing_raised do create_driver(conf) end end test 'configured as secondary with primary using chunk_key_tag and not using chunk_key_time' do require 'fluent/plugin/out_null' conf = config_element('match', '**', { }, [ config_element('buffer', 'tag', { }), config_element('secondary', '', { '@type' => 'file', 'path' => "#{TMP_DIR}/testing_to_dump_by_out_file", }), ]) assert_nothing_raised do Fluent::Test::Driver::Output.new(Fluent::Plugin::NullOutput).configure(conf) end end end sub_test_case 'fully configured output' do setup do Timecop.freeze(Time.parse("2016-10-03 23:58:00 UTC")) conf = config_element('match', '**', { 'path' => "#{TMP_DIR}/${tag}/${type}/full.%Y%m%d.%H%M.log", 'add_path_suffix' => 'false', 'append' => "true", 'symlink_path' => "#{TMP_DIR}/full.current.log", 'compress' => 'gzip', 'recompress' => 'true', }, [ config_element('inject', '', { 'hostname_key' => 'hostname', 'hostname' => 'testing.local', 'tag_key' => 'tag', 'time_key' => 'time', 'time_type' => 'string', 'time_format' => '%Y/%m/%d %H:%M:%S %z', 'timezone' => '+0900', }), config_element('format', '', { '@type' => 'out_file', 'include_tag' => 'true', 'include_time' => 'true', 'delimiter' => 'COMMA', 'time_type' => 'string', 'time_format' => '%Y-%m-%d %H:%M:%S %z', 'utc' => 'true', }), config_element('buffer', 'time,tag,type', { '@type' => 'file', 'timekey' => '15m', 'timekey_wait' => '5s', 'timekey_zone' => '+0000', 'path' => "#{TMP_DIR}/buf_full", 'chunk_limit_size' => '50m', 'total_limit_size' => '1g', 'compress' => 'gzip', }), ]) @d = create_driver(conf) end teardown do FileUtils.rm_rf("#{TMP_DIR}/buf_full") FileUtils.rm_rf("#{TMP_DIR}/my.data") FileUtils.rm_rf("#{TMP_DIR}/your.data") FileUtils.rm_rf("#{TMP_DIR}/full.current.log") Timecop.return end test 'can format/write data correctly' do d = @d assert_equal 50*1024*1024, d.instance.buffer.chunk_limit_size assert_equal 1*1024*1024*1024, d.instance.buffer.total_limit_size assert !(File.symlink?("#{TMP_DIR}/full.current.log")) t1 = event_time("2016-10-03 23:58:09 UTC") t2 = event_time("2016-10-03 23:59:33 UTC") t3 = event_time("2016-10-03 23:59:57 UTC") t4 = event_time("2016-10-04 00:00:17 UTC") t5 = event_time("2016-10-04 00:01:59 UTC") Timecop.freeze(Time.parse("2016-10-03 23:58:30 UTC")) d.run(start: true, flush: false, shutdown: false) do d.feed('my.data', t1, {"type" => "a", "message" => "data raw content"}) d.feed('my.data', t2, {"type" => "a", "message" => "data raw content"}) d.feed('your.data', t3, {"type" => "a", "message" => "data raw content"}) end assert_equal 3, d.formatted.size assert Dir.exist?("#{TMP_DIR}/buf_full") assert !(Dir.exist?("#{TMP_DIR}/my.data/a")) assert !(Dir.exist?("#{TMP_DIR}/your.data/a")) buffer_files = Dir.entries("#{TMP_DIR}/buf_full").reject{|e| e =~ /^\.+$/ } assert_equal 2, buffer_files.select{|n| n.end_with?('.meta') }.size assert_equal 2, buffer_files.select{|n| !n.end_with?('.meta') }.size m1 = d.instance.metadata('my.data', t1, {"type" => "a"}) m2 = d.instance.metadata('your.data', t3, {"type" => "a"}) assert_equal 2, d.instance.buffer.stage.size b1_path = d.instance.buffer.stage[m1].path b1_size = File.lstat(b1_path).size unless Fluent.windows? assert File.symlink?("#{TMP_DIR}/full.current.log") assert_equal d.instance.buffer.stage[m2].path, File.readlink("#{TMP_DIR}/full.current.log") end Timecop.freeze(Time.parse("2016-10-04 00:00:06 UTC")) d.run(start: false, flush: true, shutdown: true) do d.feed('my.data', t4, {"type" => "a", "message" => "data raw content"}) d.feed('your.data', t5, {"type" => "a", "message" => "data raw content"}) end assert Dir.exist?("#{TMP_DIR}/buf_full") assert Dir.exist?("#{TMP_DIR}/my.data/a") assert Dir.exist?("#{TMP_DIR}/your.data/a") buffer_files = Dir.entries("#{TMP_DIR}/buf_full").reject{|e| e =~ /^\.+$/ } assert_equal 0, buffer_files.size assert File.exist?("#{TMP_DIR}/my.data/a/full.20161003.2345.log.gz") assert File.exist?("#{TMP_DIR}/my.data/a/full.20161004.0000.log.gz") assert File.exist?("#{TMP_DIR}/your.data/a/full.20161003.2345.log.gz") assert File.exist?("#{TMP_DIR}/your.data/a/full.20161004.0000.log.gz") assert{ File.lstat("#{TMP_DIR}/my.data/a/full.20161003.2345.log.gz").size < b1_size } # recompress assert_equal 5, d.formatted.size r1 = %!2016-10-03 23:58:09 +0000,my.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"my.data","time":"2016/10/04 08:58:09 +0900"}#{@default_newline}! r2 = %!2016-10-03 23:59:33 +0000,my.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"my.data","time":"2016/10/04 08:59:33 +0900"}#{@default_newline}! r3 = %!2016-10-03 23:59:57 +0000,your.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"your.data","time":"2016/10/04 08:59:57 +0900"}#{@default_newline}! r4 = %!2016-10-04 00:00:17 +0000,my.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"my.data","time":"2016/10/04 09:00:17 +0900"}#{@default_newline}! r5 = %!2016-10-04 00:01:59 +0000,your.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"your.data","time":"2016/10/04 09:01:59 +0900"}#{@default_newline}! assert_equal r1, d.formatted[0] assert_equal r2, d.formatted[1] assert_equal r3, d.formatted[2] assert_equal r4, d.formatted[3] assert_equal r5, d.formatted[4] read_gunzip = ->(path){ File.open(path){ |fio| Zlib::GzipReader.new(StringIO.new(fio.read)).read } } assert_equal r1 + r2, read_gunzip.call("#{TMP_DIR}/my.data/a/full.20161003.2345.log.gz") assert_equal r3, read_gunzip.call("#{TMP_DIR}/your.data/a/full.20161003.2345.log.gz") assert_equal r4, read_gunzip.call("#{TMP_DIR}/my.data/a/full.20161004.0000.log.gz") assert_equal r5, read_gunzip.call("#{TMP_DIR}/your.data/a/full.20161004.0000.log.gz") end end sub_test_case 'format' do test 'timezone UTC specified' do d = create_driver time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) d.feed(time, {"a"=>2}) end assert_equal 2, d.formatted.size assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}], d.formatted[0] assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}], d.formatted[1] end test 'time formatted with specified timezone, using area name' do d = create_driver %[ path #{TMP_DIR}/out_file_test timezone Asia/Taipei ] time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) end assert_equal 1, d.formatted.size assert_equal %[2011-01-02T21:14:15+08:00\ttest\t{"a":1}#{@default_newline}], d.formatted[0] end test 'time formatted with specified timezone, using offset' do d = create_driver %[ path #{TMP_DIR}/out_file_test timezone -03:30 ] time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) end assert_equal 1, d.formatted.size assert_equal %[2011-01-02T09:44:15-03:30\ttest\t{"a":1}#{@default_newline}], d.formatted[0] end test 'configuration error raised for invalid timezone' do assert_raise(Fluent::ConfigError) do create_driver %[ path #{TMP_DIR}/out_file_test timezone Invalid/Invalid ] end end end def check_gzipped_result(path, expect) # Zlib::GzipReader has a bug of concatenated file: https://bugs.ruby-lang.org/issues/9790 # Following code from https://www.ruby-forum.com/topic/971591#979520 result = '' File.open(path, "rb") { |io| loop do gzr = Zlib::GzipReader.new(StringIO.new(io.read)) result << gzr.read unused = gzr.unused gzr.finish break if unused.nil? io.pos -= unused.length end } assert_equal expect, result end def check_result(path, expect) result = File.read(path, mode: "rb") assert_equal expect, result end sub_test_case 'write' do test 'basic case' do d = create_driver assert_false File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz") time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) d.feed(time, {"a"=>2}) end assert File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz") check_gzipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}]) end end sub_test_case 'file/directory permissions' do TMP_DIR_WITH_SYSTEM = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_file_system#{ENV['TEST_ENV_NUMBER']}") # 0750 interprets as "488". "488".to_i(8) # => 4. So, it makes wrong permission. Umm.... OVERRIDE_DIR_PERMISSION = 750 OVERRIDE_FILE_PERMISSION = 0620 CONFIG_WITH_SYSTEM = %[ path #{TMP_DIR_WITH_SYSTEM}/out_file_test compress gz utc timekey_use_utc true file_permission #{OVERRIDE_FILE_PERMISSION} dir_permission #{OVERRIDE_DIR_PERMISSION} ] setup do omit "NTFS doesn't support UNIX like permissions" if Fluent.windows? FileUtils.rm_rf(TMP_DIR_WITH_SYSTEM) end def parse_system(text) basepath = File.expand_path(File.dirname(__FILE__) + '/../../') Fluent::Config.parse(text, '(test)', basepath, true).elements.find { |e| e.name == 'system' } end test 'write to file with permission specifications' do system_conf = parse_system(CONFIG_WITH_SYSTEM) sc = Fluent::SystemConfig.new(system_conf) Fluent::Engine.init(sc) d = create_driver CONFIG_WITH_SYSTEM assert_false File.exist?("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz") time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) d.feed(time, {"a"=>2}) end assert File.exist?("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz") check_gzipped_result("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]) dir_mode = "%o" % File::stat(TMP_DIR_WITH_SYSTEM).mode assert_equal(OVERRIDE_DIR_PERMISSION, dir_mode[-3, 3].to_i) file_mode = "%o" % File::stat("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz").mode assert_equal(OVERRIDE_FILE_PERMISSION, file_mode[-3, 3].to_i) end end sub_test_case 'format specified' do test 'json' do d = create_driver [CONFIG, 'format json', 'include_time_key true', 'time_as_epoch'].join("\n") time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) d.feed(time, {"a"=>2}) end path = d.instance.last_written_path check_gzipped_result(path, %[#{Yajl.dump({"a" => 1, 'time' => time.to_i})}#{@default_newline}] + %[#{Yajl.dump({"a" => 2, 'time' => time.to_i})}#{@default_newline}]) end test 'ltsv' do d = create_driver [CONFIG, 'format ltsv', 'include_time_key true'].join("\n") time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) d.feed(time, {"a"=>2}) end path = d.instance.last_written_path check_gzipped_result(path, %[a:1\ttime:2011-01-02T13:14:15Z#{@default_newline}] + %[a:2\ttime:2011-01-02T13:14:15Z#{@default_newline}]) end test 'single_value' do d = create_driver [CONFIG, 'format single_value', 'message_key a'].join("\n") time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) d.feed(time, {"a"=>2}) end path = d.instance.last_written_path check_gzipped_result(path, %[1#{@default_newline}] + %[2#{@default_newline}]) end end test 'path with index number' do time = event_time("2011-01-02 13:14:15 UTC") formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}] write_once = ->(){ d = create_driver d.run(default_tag: 'test'){ d.feed(time, {"a"=>1}) d.feed(time, {"a"=>2}) } d.instance.last_written_path } assert !File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz") path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102_0.log.gz", path check_gzipped_result(path, formatted_lines) assert_equal 1, Dir.glob("#{TMP_DIR}/out_file_test.*").size path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102_1.log.gz", path check_gzipped_result(path, formatted_lines) assert_equal 2, Dir.glob("#{TMP_DIR}/out_file_test.*").size path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102_2.log.gz", path check_gzipped_result(path, formatted_lines) assert_equal 3, Dir.glob("#{TMP_DIR}/out_file_test.*").size end data( "with compression" => true, "without compression" => false, ) test 'append' do |compression| time = event_time("2011-01-02 13:14:15 UTC") formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}] write_once = ->(){ config = %[ path #{TMP_DIR}/out_file_test utc append true timekey_use_utc true ] if compression config << " compress gz" end d = create_driver(config) d.run(default_tag: 'test'){ d.feed(time, {"a"=>1}) d.feed(time, {"a"=>2}) } d.instance.last_written_path } log_file_name = "out_file_test.20110102.log" if compression log_file_name << ".gz" end 1.upto(3) do |i| path = write_once.call assert_equal "#{TMP_DIR}/#{log_file_name}", path expect = formatted_lines * i if compression check_gzipped_result(path, expect) else check_result(path, expect) end end end test 'append when JST' do with_timezone(Fluent.windows? ? "JST-9" : "Asia/Tokyo") do time = event_time("2011-01-02 03:14:15+09:00") formatted_lines = %[2011-01-02T03:14:15+09:00\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T03:14:15+09:00\ttest\t{"a":2}#{@default_newline}] write_once = ->(){ d = create_driver %[ path #{TMP_DIR}/out_file_test compress gz append true timekey_use_utc false timekey_zone Asia/Tokyo ] d.run(default_tag: 'test'){ d.feed(time, {"a"=>1}) d.feed(time, {"a"=>2}) } d.instance.last_written_path } path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path check_gzipped_result(path, formatted_lines) path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path check_gzipped_result(path, formatted_lines * 2) path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path check_gzipped_result(path, formatted_lines * 3) end end test 'append when UTC-02 but timekey_zone is +0900' do with_timezone("UTC-02") do # +0200 time = event_time("2011-01-02 17:14:15+02:00") formatted_lines = %[2011-01-02T17:14:15+02:00\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T17:14:15+02:00\ttest\t{"a":2}#{@default_newline}] write_once = ->(){ d = create_driver %[ path #{TMP_DIR}/out_file_test compress gz append true timekey_use_utc false timekey_zone +0900 ] d.run(default_tag: 'test'){ d.feed(time, {"a"=>1}) d.feed(time, {"a"=>2}) } d.instance.last_written_path } path = write_once.call # Rotated at 2011-01-02 17:00:00+02:00 assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path check_gzipped_result(path, formatted_lines) path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path check_gzipped_result(path, formatted_lines * 2) path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path check_gzipped_result(path, formatted_lines * 3) end end test '${chunk_id}' do time = event_time("2011-01-02 13:14:15 UTC") write_once = ->(){ d = create_driver %[ path #{TMP_DIR}/out_file_chunk_id_${chunk_id} utc append true timekey_use_utc true ] d.run(default_tag: 'test'){ d.feed(time, {"a"=>1}) d.feed(time, {"a"=>2}) } d.instance.last_written_path } path = write_once.call if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).20110102.log/ unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate) assert_equal unique_id.size, $1.size, "chunk_id size is mismatched" else flunk "chunk_id is not included in the path" end end SYMLINK_PATH = File.expand_path("#{TMP_DIR}/current") sub_test_case 'symlink' do test 'static symlink' do omit "Windows doesn't support symlink" if Fluent.windows? conf = CONFIG + %[ symlink_path #{SYMLINK_PATH} ] symlink_path = "#{SYMLINK_PATH}" d = create_driver(conf) begin run_and_check(d, symlink_path) ensure FileUtils.rm_rf(symlink_path) end end test 'symlink with placeholders' do omit "Windows doesn't support symlink" if Fluent.windows? conf = %[ path #{TMP_DIR}/${tag}/out_file_test symlink_path #{SYMLINK_PATH}/foo/${tag} ] symlink_path = "#{SYMLINK_PATH}/foo/tag" d = create_driver(conf) begin run_and_check(d, symlink_path) ensure FileUtils.rm_rf(symlink_path) end end def run_and_check(d, symlink_path) d.run(default_tag: 'tag') do es = Fluent::OneEventStream.new(event_time("2011-01-02 13:14:15 UTC"), {"a"=>1}) d.feed(es) assert File.symlink?(symlink_path) assert File.exist?(symlink_path) # This checks dest of symlink exists or not. es = Fluent::OneEventStream.new(event_time("2011-01-03 14:15:16 UTC"), {"a"=>2}) d.feed(es) assert File.symlink?(symlink_path) assert File.exist?(symlink_path) meta = d.instance.metadata('tag', event_time("2011-01-03 14:15:16 UTC"), {}) assert_equal d.instance.buffer.instance_eval{ @stage[meta].path }, File.readlink(symlink_path) end end end sub_test_case 'path' do test 'normal' do d = create_driver(%[ path #{TMP_DIR}/out_file_test time_slice_format %Y-%m-%d-%H utc true ]) time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) end path = d.instance.last_written_path assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13_0.log", path end test 'normal with append' do d = create_driver(%[ path #{TMP_DIR}/out_file_test time_slice_format %Y-%m-%d-%H utc true append true ]) time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) end path = d.instance.last_written_path assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13.log", path end test '*' do d = create_driver(%[ path #{TMP_DIR}/out_file_test.*.txt time_slice_format %Y-%m-%d-%H utc true ]) time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) end path = d.instance.last_written_path assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13_0.txt", path end test '* with append' do d = create_driver(%[ path #{TMP_DIR}/out_file_test.*.txt time_slice_format %Y-%m-%d-%H utc true append true ]) time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: 'test') do d.feed(time, {"a"=>1}) end path = d.instance.last_written_path assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13.txt", path end end sub_test_case '#timekey_to_timeformat' do setup do @d = create_driver @i = @d.instance end test 'returns empty string for nil' do assert_equal '', @i.timekey_to_timeformat(nil) end test 'returns timestamp string with seconds for timekey smaller than 60' do assert_equal '%Y%m%d%H%M%S', @i.timekey_to_timeformat(1) assert_equal '%Y%m%d%H%M%S', @i.timekey_to_timeformat(30) assert_equal '%Y%m%d%H%M%S', @i.timekey_to_timeformat(59) end test 'returns timestamp string with minutes for timekey smaller than 3600' do assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(60) assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(180) assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(1800) assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(3599) end test 'returns timestamp string with hours for timekey smaller than 86400 (1 day)' do assert_equal '%Y%m%d%H', @i.timekey_to_timeformat(3600) assert_equal '%Y%m%d%H', @i.timekey_to_timeformat(7200) assert_equal '%Y%m%d%H', @i.timekey_to_timeformat(86399) end test 'returns timestamp string with days for timekey equal or greater than 86400' do assert_equal '%Y%m%d', @i.timekey_to_timeformat(86400) assert_equal '%Y%m%d', @i.timekey_to_timeformat(1000000) assert_equal '%Y%m%d', @i.timekey_to_timeformat(1000000000) end end sub_test_case '#compression_suffix' do setup do @i = create_driver.instance end test 'returns empty string for nil (no compression method specified)' do assert_equal '', @i.compression_suffix(nil) end test 'returns .gz for gzip' do assert_equal '.gz', @i.compression_suffix(:gzip) end end sub_test_case '#generate_path_template' do setup do @i = create_driver.instance end data( 'day' => [86400, '%Y%m%d', '%Y-%m-%d'], 'hour' => [3600, '%Y%m%d%H', '%Y-%m-%d_%H'], 'minute' => [60, '%Y%m%d%H%M', '%Y-%m-%d_%H%M'], ) test 'generates path with timestamp placeholder for original path with tailing star with timekey' do |data| timekey, placeholder, time_slice_format = data # with index placeholder, without compression suffix when append disabled and compression disabled assert_equal "/path/to/file.#{placeholder}_**", @i.generate_path_template('/path/to/file.*', timekey, false, nil) # with index placeholder, with .gz suffix when append disabled and gzip compression enabled assert_equal "/path/to/file.#{placeholder}_**.gz", @i.generate_path_template('/path/to/file.*', timekey, false, :gzip) # without index placeholder, without compression suffix when append enabled and compression disabled assert_equal "/path/to/file.#{placeholder}", @i.generate_path_template('/path/to/file.*', timekey, true, nil) # without index placeholder, with .gz suffix when append disabled and gzip compression enabled assert_equal "/path/to/file.#{placeholder}.gz", @i.generate_path_template('/path/to/file.*', timekey, true, :gzip) # time_slice_format will used instead of computed placeholder if specified assert_equal "/path/to/file.#{time_slice_format}_**", @i.generate_path_template('/path/to/file.*', timekey, false, nil, time_slice_format: time_slice_format) assert_equal "/path/to/file.#{time_slice_format}_**.gz", @i.generate_path_template('/path/to/file.*', timekey, false, :gzip, time_slice_format: time_slice_format) assert_equal "/path/to/file.#{time_slice_format}", @i.generate_path_template('/path/to/file.*', timekey, true, nil, time_slice_format: time_slice_format) assert_equal "/path/to/file.#{time_slice_format}.gz", @i.generate_path_template('/path/to/file.*', timekey, true, :gzip, time_slice_format: time_slice_format) end data( 'day' => [86400 * 2, '%Y%m%d', '%Y-%m-%d'], 'hour' => [7200, '%Y%m%d%H', '%Y-%m-%d_%H'], 'minute' => [180, '%Y%m%d%H%M', '%Y-%m-%d_%H%M'], ) test 'generates path with timestamp placeholder for original path with star and suffix with timekey' do |data| timekey, placeholder, time_slice_format = data # with index placeholder, without compression suffix when append disabled and compression disabled assert_equal "/path/to/file.#{placeholder}_**.data", @i.generate_path_template('/path/to/file.*.data', timekey, false, nil) # with index placeholder, with .gz suffix when append disabled and gzip compression enabled assert_equal "/path/to/file.#{placeholder}_**.data.gz", @i.generate_path_template('/path/to/file.*.data', timekey, false, :gzip) # without index placeholder, without compression suffix when append enabled and compression disabled assert_equal "/path/to/file.#{placeholder}.data", @i.generate_path_template('/path/to/file.*.data', timekey, true, nil) # without index placeholder, with .gz suffix when append disabled and gzip compression enabled assert_equal "/path/to/file.#{placeholder}.data.gz", @i.generate_path_template('/path/to/file.*.data', timekey, true, :gzip) # time_slice_format will used instead of computed placeholder if specified assert_equal "/path/to/file.#{time_slice_format}_**.data", @i.generate_path_template('/path/to/file.*.data', timekey, false, nil, time_slice_format: time_slice_format) assert_equal "/path/to/file.#{time_slice_format}_**.data.gz", @i.generate_path_template('/path/to/file.*.data', timekey, false, :gzip, time_slice_format: time_slice_format) assert_equal "/path/to/file.#{time_slice_format}.data", @i.generate_path_template('/path/to/file.*.data', timekey, true, nil, time_slice_format: time_slice_format) assert_equal "/path/to/file.#{time_slice_format}.data.gz", @i.generate_path_template('/path/to/file.*.data', timekey, true, :gzip, time_slice_format: time_slice_format) end test 'raise error to show it is a bug when path including * specified without timekey' do assert_raise RuntimeError.new("BUG: configuration error must be raised for path including '*' without timekey") do @i.generate_path_template('/path/to/file.*.log', nil, false, nil) end end data( 'day' => [86400 * 7, '%Y%m%d', '%Y-%m-%d'], 'hour' => [3600 * 6, '%Y%m%d%H', '%Y-%m-%d_%H'], 'minute' => [60 * 15, '%Y%m%d%H%M', '%Y-%m-%d_%H%M'], ) test 'generates path with timestamp placeholder for original path without time placeholders & star with timekey, and path_suffix configured' do |data| timekey, placeholder, time_slice_format = data # with index placeholder, without compression suffix when append disabled and compression disabled assert_equal "/path/to/file.#{placeholder}_**.log", @i.generate_path_template('/path/to/file', timekey, false, nil, path_suffix: '.log') # with index placeholder, with .gz suffix when append disabled and gzip compression enabled assert_equal "/path/to/file.#{placeholder}_**.log.gz", @i.generate_path_template('/path/to/file', timekey, false, :gzip, path_suffix: '.log') # without index placeholder, without compression suffix when append enabled and compression disabled assert_equal "/path/to/file.#{placeholder}.log", @i.generate_path_template('/path/to/file', timekey, true, nil, path_suffix: '.log') # without index placeholder, with compression suffix when append enabled and gzip compression enabled assert_equal "/path/to/file.#{placeholder}.log.gz", @i.generate_path_template('/path/to/file', timekey, true, :gzip, path_suffix: '.log') # time_slice_format will be appended always if it's specified assert_equal "/path/to/file.#{time_slice_format}_**.log", @i.generate_path_template('/path/to/file', timekey, false, nil, path_suffix: '.log', time_slice_format: time_slice_format) assert_equal "/path/to/file.#{time_slice_format}_**.log.gz", @i.generate_path_template('/path/to/file', timekey, false, :gzip, path_suffix: '.log', time_slice_format: time_slice_format) assert_equal "/path/to/file.#{time_slice_format}.log", @i.generate_path_template('/path/to/file', timekey, true, nil, path_suffix: '.log', time_slice_format: time_slice_format) assert_equal "/path/to/file.#{time_slice_format}.log.gz", @i.generate_path_template('/path/to/file', timekey, true, :gzip, path_suffix: '.log', time_slice_format: time_slice_format) end data( 'day' => [86400, '%Y%m%d'], 'hour' => [3600, '%Y%m%d%H'], 'minute' => [60, '%Y%m%d%H%M'], ) test 'generates path with timestamp placeholder for original path without star with timekey, and path_suffix not configured' do |data| timekey, placeholder = data # with index placeholder, without compression suffix when append disabled and compression disabled assert_equal "/path/to/file.#{placeholder}_**", @i.generate_path_template('/path/to/file', timekey, false, nil) # with index placeholder, with .gz suffix when append disabled and gzip compression enabled assert_equal "/path/to/file.#{placeholder}_**.gz", @i.generate_path_template('/path/to/file', timekey, false, :gzip) # without index placeholder, without compression suffix when append enabled and compression disabled assert_equal "/path/to/file.#{placeholder}", @i.generate_path_template('/path/to/file', timekey, true, nil) # without index placeholder, with compression suffix when append enabled and gzip compression enabled assert_equal "/path/to/file.#{placeholder}.gz", @i.generate_path_template('/path/to/file', timekey, true, :gzip) end test 'generates path without adding timestamp placeholder part if original path has enough placeholders for specified timekey' do assert_equal "/path/to/file.%Y%m%d", @i.generate_path_template('/path/to/file.%Y%m%d', 86400, true, nil) assert_equal "/path/to/%Y%m%d/file", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, nil) assert_equal "/path/to/%Y%m%d/file_**", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, false, nil) assert_raise Fluent::ConfigError.new("insufficient timestamp placeholders in path") do @i.generate_path_template('/path/to/%Y%m/file', 86400, true, nil) end assert_raise Fluent::ConfigError.new("insufficient timestamp placeholders in path") do @i.generate_path_template('/path/to/file.%Y%m%d.log', 3600, true, nil) end assert_equal "/path/to/file.%Y%m%d_%H_**.log.gz", @i.generate_path_template('/path/to/file.%Y%m%d_%H', 7200, false, :gzip, path_suffix: '.log') assert_equal "/path/to/${tag}/file.%Y%m%d_%H_**.log.gz", @i.generate_path_template('/path/to/${tag}/file.%Y%m%d_%H', 7200, false, :gzip, path_suffix: '.log') end test 'generates path with specified time_slice_format appended even if path has sufficient timestamp placeholders' do assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H_**", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, false, nil, time_slice_format: '%Y-%m-%d_%H') assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, nil, time_slice_format: '%Y-%m-%d_%H') assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H_**.log", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, false, nil, time_slice_format: '%Y-%m-%d_%H', path_suffix: '.log') assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H.log", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, nil, time_slice_format: '%Y-%m-%d_%H', path_suffix: '.log') assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H.log.gz", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, :gzip, time_slice_format: '%Y-%m-%d_%H', path_suffix: '.log') end test 'generates path without timestamp placeholder when path does not include * and timekey not specified' do assert_equal '/path/to/file.log', @i.generate_path_template('/path/to/file.log', nil, true, nil) assert_equal '/path/to/file.log_**', @i.generate_path_template('/path/to/file.log', nil, false, nil) assert_equal '/path/to/file.${tag}.log_**', @i.generate_path_template('/path/to/file.${tag}.log', nil, false, nil) assert_equal '/path/to/file.${tag}_**.log', @i.generate_path_template('/path/to/file.${tag}', nil, false, nil, path_suffix: '.log') end end sub_test_case '#find_filepath_available' do setup do @tmp = File.join(TMP_DIR, 'find_filepath_test') FileUtils.mkdir_p @tmp @i = create_driver.instance end teardown do FileUtils.rm_rf @tmp end test 'raise error if argument path does not include index placeholder' do assert_raise RuntimeError.new("BUG: index placeholder not found in path: #{@tmp}/myfile") do @i.find_filepath_available("#{@tmp}/myfile") do |path| # ... end end end data( 'without suffix' => ['myfile_0', 'myfile_**'], 'with timestamp' => ['myfile_20161003_0', 'myfile_20161003_**'], 'with base suffix' => ['myfile_0.log', 'myfile_**.log'], 'with compression suffix' => ['myfile_0.log.gz', 'myfile_**.log.gz'], ) test 'returns filepath with _0 at first' do |data| expected, argument = data @i.find_filepath_available(File.join(@tmp, argument)) do |path| assert_equal File.join(@tmp, expected), path end end test 'returns filepath with index which does not exist yet' do 5.times do |i| Fluent::FileWrapper.open(File.join(@tmp, "exist_#{i}.log"), 'a'){|f| } # open(create) and close end @i.find_filepath_available(File.join(@tmp, "exist_**.log")) do |path| assert_equal File.join(@tmp, "exist_5.log"), path end end test 'creates lock directory when with_lock is true to exclude operations of other worker process' do 5.times do |i| Fluent::FileWrapper.open(File.join(@tmp, "exist_#{i}.log"), 'a') end Dir.mkdir(File.join(@tmp, "exist_5.log.lock")) @i.find_filepath_available(File.join(@tmp, "exist_**.log"), with_lock: true) do |path| assert Dir.exist?(File.join(@tmp, "exist_6.log.lock")) assert_equal File.join(@tmp, "exist_6.log"), path end end end end