require_relative '../helper' require 'fluent/plugin/buf_file_single' require 'fluent/plugin/output' require 'fluent/unique_id' require 'fluent/system_config' require 'fluent/env' require 'fluent/test/driver/output' require 'msgpack' module FluentPluginFileSingleBufferTest class DummyOutputPlugin < Fluent::Plugin::Output Fluent::Plugin.register_output('buf_file_single_test', self) config_section :buffer do config_set_default :@type, 'file_single' end def multi_workers_ready? true end def write(chunk) # drop end end class DummyOutputMPPlugin < Fluent::Plugin::Output Fluent::Plugin.register_output('buf_file_single_mp_test', self) config_section :buffer do config_set_default :@type, 'file_single' end def formatted_to_msgpack_binary? true end def multi_workers_ready? true end def write(chunk) # drop end end end class FileSingleBufferTest < Test::Unit::TestCase def metadata(timekey: nil, tag: 'testing', variables: nil) Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables) end PATH = File.expand_path('../../tmp/buffer_file_single_dir', __FILE__) TAG_CONF = %[ @type file_single path #{PATH} ] FIELD_CONF = %[ @type file_single path #{PATH} ] setup do Fluent::Test.setup @d = nil @bufdir = PATH FileUtils.rm_rf(@bufdir) rescue nil FileUtils.mkdir_p(@bufdir) end teardown do FileUtils.rm_rf(@bufdir) rescue nil end def create_driver(conf = TAG_CONF, klass = FluentPluginFileSingleBufferTest::DummyOutputPlugin) Fluent::Test::Driver::Output.new(klass).configure(conf) end sub_test_case 'configuration' do test 'path has "fsb" prefix and "buf" suffix by default' do @d = create_driver p = @d.instance.buffer assert_equal File.join(@bufdir, 'fsb.*.buf'), p.path end data('text based chunk' => [FluentPluginFileSingleBufferTest::DummyOutputPlugin, :text], 'msgpack based chunk' => [FluentPluginFileSingleBufferTest::DummyOutputMPPlugin, :msgpack]) test 'detect chunk_format' do |param| klass, expected = param @d = create_driver(TAG_CONF, klass) p = @d.instance.buffer assert_equal expected, p.chunk_format end test '"prefix.*.suffix" path will be replaced with default' do @d = create_driver(%[ @type file_single path #{@bufdir}/foo.*.bar ]) p = @d.instance.buffer assert_equal File.join(@bufdir, 'fsb.*.buf'), p.path end end sub_test_case 'buffer configurations and workers' do setup do @d = FluentPluginFileSingleBufferTest::DummyOutputPlugin.new end test 'enables multi worker configuration with unexisting directory path' do FileUtils.rm_rf(@bufdir) buf_conf = config_element('buffer', '', {'path' => @bufdir}) assert_nothing_raised do Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir, 'workers' => 4) do @d.configure(config_element('ROOT', '', {}, [buf_conf])) end end end test 'enables multi worker configuration with existing directory path' do FileUtils.mkdir_p @bufdir buf_conf = config_element('buffer', '', {'path' => @bufdir}) assert_nothing_raised do Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir, 'workers' => 4) do @d.configure(config_element('ROOT', '', {}, [buf_conf])) end end end test 'enables multi worker configuration with root dir' do buf_conf = config_element('buffer', '') assert_nothing_raised do Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir, 'workers' => 4) do @d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'}, [buf_conf])) end end end end test 'raise config error when using same file path' do d = FluentPluginFileSingleBufferTest::DummyOutputPlugin.new d2 = FluentPluginFileSingleBufferTest::DummyOutputPlugin.new Fluent::SystemConfig.overwrite_system_config({}) do d.configure(config_element('ROOT', '', {}, [config_element('buffer', '', { 'path' => File.join(PATH, 'foo.*.bar') })])) end any_instance_of(Fluent::Plugin::FileSingleBuffer) do |klass| stub(klass).called_in_test? { false } end err = assert_raise(Fluent::ConfigError) do Fluent::SystemConfig.overwrite_system_config({}) do d2.configure(config_element('ROOT', '', {}, [config_element('buffer', '', { 'path' => PATH })])) end end assert_match(/plugin already uses same buffer path/, err.message) end sub_test_case 'buffer plugin configured only with path' do setup do @bufpath = File.join(@bufdir, 'testbuf.*.buf') FileUtils.rm_rf(@bufdir) if File.exist?(@bufdir) @d = create_driver @p = @d.instance.buffer end teardown do if @p @p.stop unless @p.stopped? @p.before_shutdown unless @p.before_shutdown? @p.shutdown unless @p.shutdown? @p.after_shutdown unless @p.after_shutdown? @p.close unless @p.closed? @p.terminate unless @p.terminated? end end test 'this is persistent plugin' do assert @p.persistent? end test '#start creates directory for buffer chunks' do @d = create_driver @p = @d.instance.buffer FileUtils.rm_rf(@bufdir) if File.exist?(@bufdir) assert !File.exist?(@bufdir) @p.start assert File.exist?(@bufdir) assert { File.stat(@bufdir).mode.to_s(8).end_with?('755') } end test '#start creates directory for buffer chunks with specified permission' do omit "NTFS doesn't support UNIX like permissions" if Fluent.windows? @d = create_driver(%[ @type file_single path #{PATH} dir_permission 700 ]) @p = @d.instance.buffer FileUtils.rm_rf(@bufdir) if File.exist?(@bufdir) assert !File.exist?(@bufdir) @p.start assert File.exist?(@bufdir) assert { File.stat(@bufdir).mode.to_s(8).end_with?('700') } end test '#start creates directory for buffer chunks with specified permission via system config' do omit "NTFS doesn't support UNIX like permissions" if Fluent.windows? sysconf = {'dir_permission' => '700'} Fluent::SystemConfig.overwrite_system_config(sysconf) do @d = create_driver @p = @d.instance.buffer FileUtils.rm_r @bufdir if File.exist?(@bufdir) assert !File.exist?(@bufdir) @p.start assert File.exist?(@bufdir) assert { File.stat(@bufdir).mode.to_s(8).end_with?('700') } end end test '#generate_chunk generates blank file chunk on path with unique_id and tag' do FileUtils.mkdir_p(@bufdir) unless File.exist?(@bufdir) m1 = metadata() c1 = @p.generate_chunk(m1) assert c1.is_a? Fluent::Plugin::Buffer::FileSingleChunk assert_equal m1, c1.metadata assert c1.empty? assert_equal :unstaged, c1.state assert_equal Fluent::DEFAULT_FILE_PERMISSION, c1.permission assert_equal File.join(@bufdir, "fsb.testing.b#{Fluent::UniqueId.hex(c1.unique_id)}.buf"), c1.path assert{ File.stat(c1.path).mode.to_s(8).end_with?('644') } c1.purge end test '#generate_chunk generates blank file chunk on path with unique_id and field key' do FileUtils.mkdir_p(@bufdir) unless File.exist?(@bufdir) @d = create_driver(FIELD_CONF) @p = @d.instance.buffer m1 = metadata(tag: nil, variables: {:k => 'foo_bar'}) c1 = @p.generate_chunk(m1) assert c1.is_a? Fluent::Plugin::Buffer::FileSingleChunk assert_equal m1, c1.metadata assert c1.empty? assert_equal :unstaged, c1.state assert_equal Fluent::DEFAULT_FILE_PERMISSION, c1.permission assert_equal File.join(@bufdir, "fsb.foo_bar.b#{Fluent::UniqueId.hex(c1.unique_id)}.buf"), c1.path c1.purge end test '#generate_chunk generates blank file chunk with specified permission' do omit "NTFS doesn't support UNIX like permissions" if Fluent.windows? @d = create_driver(%[ @type file_single path #{PATH} file_permission 600 ]) @p = @d.instance.buffer FileUtils.rm_r @bufdir if File.exist?(@bufdir) assert !File.exist?(@bufdir) @p.start m = metadata() c = @p.generate_chunk(m) assert c.is_a? Fluent::Plugin::Buffer::FileSingleChunk assert_equal m, c.metadata assert c.empty? assert_equal :unstaged, c.state assert_equal 0600, c.permission assert_equal File.join(@bufdir, "fsb.testing.b#{Fluent::UniqueId.hex(c.unique_id)}.buf"), c.path assert{ File.stat(c.path).mode.to_s(8).end_with?('600') } c.purge end test '#generate_chunk generates blank file chunk with specified permission with system_config' do omit "NTFS doesn't support UNIX like permissions" if Fluent.windows? @d = create_driver(%[ @type file_single path #{PATH} ]) @p = @d.instance.buffer FileUtils.rm_r @bufdir if File.exist?(@bufdir) assert !File.exist?(@bufdir) @p.start m = metadata() c = nil Fluent::SystemConfig.overwrite_system_config("file_permission" => "700") do c = @p.generate_chunk(m) end assert c.is_a? Fluent::Plugin::Buffer::FileSingleChunk assert_equal m, c.metadata assert c.empty? assert_equal :unstaged, c.state assert_equal 0700, c.permission assert_equal File.join(@bufdir, "fsb.testing.b#{Fluent::UniqueId.hex(c.unique_id)}.buf"), c.path assert{ File.stat(c.path).mode.to_s(8).end_with?('700') } c.purge end end sub_test_case 'configured with system root directory and plugin @id' do setup do @root_dir = File.expand_path('../../tmp/buffer_file_single_root', __FILE__) FileUtils.rm_rf(@root_dir) @d = FluentPluginFileSingleBufferTest::DummyOutputPlugin.new @p = nil end teardown do if @p @p.stop unless @p.stopped? @p.before_shutdown unless @p.before_shutdown? @p.shutdown unless @p.shutdown? @p.after_shutdown unless @p.after_shutdown? @p.close unless @p.closed? @p.terminate unless @p.terminated? end end test '#start creates directory for buffer chunks' do Fluent::SystemConfig.overwrite_system_config('root_dir' => @root_dir) do @d.configure(config_element('ROOT', '', {'@id' => 'dummy_output_with_buf'}, [config_element('buffer', '', {})])) @p = @d.buffer end expected_buffer_path = File.join(@root_dir, 'worker0', 'dummy_output_with_buf', 'buffer', "fsb.*.buf") expected_buffer_dir = File.dirname(expected_buffer_path) assert_equal expected_buffer_path, @d.buffer.path assert_false Dir.exist?(expected_buffer_dir) @p.start assert Dir.exist?(expected_buffer_dir) end end sub_test_case 'buffer plugin configuration errors' do data('tag and key' => 'tag,key', 'multiple keys' => 'key1,key2') test 'invalid chunk keys' do |param| assert_raise Fluent::ConfigError do @d = create_driver(%[ @type file_single path #{PATH} calc_num_records false ]) end end test 'path is not specified' do assert_raise Fluent::ConfigError do @d = create_driver(%[ @type file_single ]) end end end sub_test_case 'there are no existing file chunks' do setup do FileUtils.rm_rf(@bufdir) if File.exist?(@bufdir) @d = create_driver @p = @d.instance.buffer @p.start end teardown do if @p @p.stop unless @p.stopped? @p.before_shutdown unless @p.before_shutdown? @p.shutdown unless @p.shutdown? @p.after_shutdown unless @p.after_shutdown? @p.close unless @p.closed? @p.terminate unless @p.terminated? end if @bufdir Dir.glob(File.join(@bufdir, '*')).each do |path| next if ['.', '..'].include?(File.basename(path)) File.delete(path) end end end test '#resume returns empty buffer state' do ary = @p.resume assert_equal({}, ary[0]) assert_equal([], ary[1]) end end sub_test_case 'there are some existing file chunks' do setup do @c1id = Fluent::UniqueId.generate p1 = File.join(@bufdir, "fsb.testing.q#{Fluent::UniqueId.hex(@c1id)}.buf") File.open(p1, 'wb') do |f| f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n" end t = Time.now - 50000 File.utime(t, t, p1) @c2id = Fluent::UniqueId.generate p2 = File.join(@bufdir, "fsb.testing.q#{Fluent::UniqueId.hex(@c2id)}.buf") File.open(p2, 'wb') do |f| f.write ["t1.test", event_time('2016-04-17 13:59:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t2.test", event_time('2016-04-17 13:59:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t3.test", event_time('2016-04-17 13:59:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" end t = Time.now - 40000 File.utime(t, t, p2) @c3id = Fluent::UniqueId.generate p3 = File.join(@bufdir, "fsb.testing.b#{Fluent::UniqueId.hex(@c3id)}.buf") File.open(p3, 'wb') do |f| f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t2.test", event_time('2016-04-17 14:00:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t3.test", event_time('2016-04-17 14:00:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay"}].to_json + "\n" end @c4id = Fluent::UniqueId.generate p4 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(@c4id)}.buf") File.open(p4, 'wb') do |f| f.write ["t1.test", event_time('2016-04-17 14:01:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t2.test", event_time('2016-04-17 14:01:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t3.test", event_time('2016-04-17 14:01:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" end end teardown do if @p @p.stop unless @p.stopped? @p.before_shutdown unless @p.before_shutdown? @p.shutdown unless @p.shutdown? @p.after_shutdown unless @p.after_shutdown? @p.close unless @p.closed? @p.terminate unless @p.terminated? end if @bufdir Dir.glob(File.join(@bufdir, '*')).each do |path| next if ['.', '..'].include?(File.basename(path)) File.delete(path) end end end test '#resume returns staged/queued chunks with metadata' do @d = create_driver @p = @d.instance.buffer @p.start assert_equal 2, @p.stage.size assert_equal 2, @p.queue.size stage = @p.stage m3 = metadata() assert_equal @c3id, stage[m3].unique_id assert_equal 4, stage[m3].size assert_equal :staged, stage[m3].state m4 = metadata(tag: 'foo') assert_equal @c4id, stage[m4].unique_id assert_equal 3, stage[m4].size assert_equal :staged, stage[m4].state end test '#resume returns queued chunks ordered by last modified time (FIFO)' do @d = create_driver @p = @d.instance.buffer @p.start assert_equal 2, @p.stage.size assert_equal 2, @p.queue.size queue = @p.queue assert{ queue[0].modified_at <= queue[1].modified_at } assert_equal @c1id, queue[0].unique_id assert_equal :queued, queue[0].state assert_equal 'testing', queue[0].metadata.tag assert_nil queue[0].metadata.variables assert_equal 4, queue[0].size assert_equal @c2id, queue[1].unique_id assert_equal :queued, queue[1].state assert_equal 'testing', queue[1].metadata.tag assert_nil queue[1].metadata.variables assert_equal 3, queue[1].size end test '#resume returns staged/queued chunks but skips size calculation by calc_num_records' do @d = create_driver(%[ @type file_single path #{PATH} calc_num_records false ]) @p = @d.instance.buffer @p.start assert_equal 2, @p.stage.size assert_equal 2, @p.queue.size stage = @p.stage m3 = metadata() assert_equal @c3id, stage[m3].unique_id assert_equal 0, stage[m3].size assert_equal :staged, stage[m3].state m4 = metadata(tag: 'foo') assert_equal @c4id, stage[m4].unique_id assert_equal 0, stage[m4].size assert_equal :staged, stage[m4].state end end sub_test_case 'there are some existing file chunks with placeholders path' do setup do @buf_ph_dir = File.expand_path('../../tmp/buffer_${test}_file_single_dir', __FILE__) FileUtils.rm_rf(@buf_ph_dir) FileUtils.mkdir_p(@buf_ph_dir) @c1id = Fluent::UniqueId.generate p1 = File.join(@buf_ph_dir, "fsb.testing.q#{Fluent::UniqueId.hex(@c1id)}.buf") File.open(p1, 'wb') do |f| f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" end t = Time.now - 50000 File.utime(t, t, p1) @c2id = Fluent::UniqueId.generate p2 = File.join(@buf_ph_dir, "fsb.testing.b#{Fluent::UniqueId.hex(@c2id)}.buf") File.open(p2, 'wb') do |f| f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" end end teardown do if @p @p.stop unless @p.stopped? @p.before_shutdown unless @p.before_shutdown? @p.shutdown unless @p.shutdown? @p.after_shutdown unless @p.after_shutdown? @p.close unless @p.closed? @p.terminate unless @p.terminated? end FileUtils.rm_rf(@buf_ph_dir) end test '#resume returns staged/queued chunks with metadata' do @d = create_driver(%[ @type file_single path #{@buf_ph_dir} ]) @p = @d.instance.buffer @p.start assert_equal 1, @p.stage.size assert_equal 1, @p.queue.size end end sub_test_case 'there are some existing msgpack file chunks' do setup do packer = Fluent::MessagePackFactory.packer @c1id = Fluent::UniqueId.generate p1 = File.join(@bufdir, "fsb.testing.q#{Fluent::UniqueId.hex(@c1id)}.buf") File.open(p1, 'wb') do |f| packer.write(["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}]) packer.write(["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}]) packer.write(["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}]) packer.write(["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}]) f.write packer.full_pack end t = Time.now - 50000 File.utime(t, t, p1) @c2id = Fluent::UniqueId.generate p2 = File.join(@bufdir, "fsb.testing.q#{Fluent::UniqueId.hex(@c2id)}.buf") File.open(p2, 'wb') do |f| packer.write(["t1.test", event_time('2016-04-17 13:59:15 -0700').to_i, {"message" => "yay"}]) packer.write(["t2.test", event_time('2016-04-17 13:59:17 -0700').to_i, {"message" => "yay"}]) packer.write(["t3.test", event_time('2016-04-17 13:59:21 -0700').to_i, {"message" => "yay"}]) f.write packer.full_pack end t = Time.now - 40000 File.utime(t, t, p2) @c3id = Fluent::UniqueId.generate p3 = File.join(@bufdir, "fsb.testing.b#{Fluent::UniqueId.hex(@c3id)}.buf") File.open(p3, 'wb') do |f| packer.write(["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}]) packer.write(["t2.test", event_time('2016-04-17 14:00:17 -0700').to_i, {"message" => "yay"}]) packer.write(["t3.test", event_time('2016-04-17 14:00:21 -0700').to_i, {"message" => "yay"}]) packer.write(["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay"}]) f.write packer.full_pack end @c4id = Fluent::UniqueId.generate p4 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(@c4id)}.buf") File.open(p4, 'wb') do |f| packer.write(["t1.test", event_time('2016-04-17 14:01:15 -0700').to_i, {"message" => "yay"}]) packer.write(["t2.test", event_time('2016-04-17 14:01:17 -0700').to_i, {"message" => "yay"}]) packer.write(["t3.test", event_time('2016-04-17 14:01:21 -0700').to_i, {"message" => "yay"}]) f.write packer.full_pack end end teardown do if @p @p.stop unless @p.stopped? @p.before_shutdown unless @p.before_shutdown? @p.shutdown unless @p.shutdown? @p.after_shutdown unless @p.after_shutdown? @p.close unless @p.closed? @p.terminate unless @p.terminated? end if @bufdir Dir.glob(File.join(@bufdir, '*')).each do |path| next if ['.', '..'].include?(File.basename(path)) File.delete(path) end end end test '#resume returns staged/queued chunks with msgpack format' do @d = create_driver(%[ @type file_single path #{PATH} chunk_format msgpack ]) @p = @d.instance.buffer @p.start assert_equal 2, @p.stage.size assert_equal 2, @p.queue.size stage = @p.stage m3 = metadata() assert_equal @c3id, stage[m3].unique_id assert_equal 4, stage[m3].size assert_equal :staged, stage[m3].state m4 = metadata(tag: 'foo') assert_equal @c4id, stage[m4].unique_id assert_equal 3, stage[m4].size assert_equal :staged, stage[m4].state end end sub_test_case 'there are some existing file chunks, both in specified path and per-worker directory under specified path, configured as multi workers' do setup do @worker0_dir = File.join(@bufdir, "worker0") @worker1_dir = File.join(@bufdir, "worker1") FileUtils.rm_rf(@bufdir) FileUtils.mkdir_p(@worker0_dir) FileUtils.mkdir_p(@worker1_dir) @bufdir_chunk_1 = Fluent::UniqueId.generate bc1 = File.join(@bufdir, "fsb.testing.q#{Fluent::UniqueId.hex(@bufdir_chunk_1)}.buf") File.open(bc1, 'wb') do |f| f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n" end @bufdir_chunk_2 = Fluent::UniqueId.generate bc2 = File.join(@bufdir, "fsb.testing.q#{Fluent::UniqueId.hex(@bufdir_chunk_2)}.buf") File.open(bc2, 'wb') do |f| f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n" end @worker_dir_chunk_1 = Fluent::UniqueId.generate wc0_1 = File.join(@worker0_dir, "fsb.testing.q#{Fluent::UniqueId.hex(@worker_dir_chunk_1)}.buf") wc1_1 = File.join(@worker1_dir, "fsb.testing.q#{Fluent::UniqueId.hex(@worker_dir_chunk_1)}.buf") [wc0_1, wc1_1].each do |chunk_path| File.open(chunk_path, 'wb') do |f| f.write ["t1.test", event_time('2016-04-17 13:59:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t2.test", event_time('2016-04-17 13:59:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t3.test", event_time('2016-04-17 13:59:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" end end @worker_dir_chunk_2 = Fluent::UniqueId.generate wc0_2 = File.join(@worker0_dir, "fsb.testing.b#{Fluent::UniqueId.hex(@worker_dir_chunk_2)}.buf") wc1_2 = File.join(@worker1_dir, "fsb.foo.b#{Fluent::UniqueId.hex(@worker_dir_chunk_2)}.buf") [wc0_2, wc1_2].each do |chunk_path| File.open(chunk_path, 'wb') do |f| f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t2.test", event_time('2016-04-17 14:00:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t3.test", event_time('2016-04-17 14:00:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay"}].to_json + "\n" end end @worker_dir_chunk_3 = Fluent::UniqueId.generate wc0_3 = File.join(@worker0_dir, "fsb.bar.b#{Fluent::UniqueId.hex(@worker_dir_chunk_3)}.buf") wc1_3 = File.join(@worker1_dir, "fsb.baz.b#{Fluent::UniqueId.hex(@worker_dir_chunk_3)}.buf") [wc0_3, wc1_3].each do |chunk_path| File.open(chunk_path, 'wb') do |f| f.write ["t1.test", event_time('2016-04-17 14:01:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t2.test", event_time('2016-04-17 14:01:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" f.write ["t3.test", event_time('2016-04-17 14:01:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" end end end teardown do if @p @p.stop unless @p.stopped? @p.before_shutdown unless @p.before_shutdown? @p.shutdown unless @p.shutdown? @p.after_shutdown unless @p.after_shutdown? @p.close unless @p.closed? @p.terminate unless @p.terminated? end end test 'worker(id=0) #resume returns staged/queued chunks with metadata, not only in worker dir, including the directory specified by path' do ENV['SERVERENGINE_WORKER_ID'] = '0' buf_conf = config_element('buffer', '', {'path' => @bufdir}) @d = FluentPluginFileSingleBufferTest::DummyOutputPlugin.new with_worker_config(workers: 2, worker_id: 0) do @d.configure(config_element('output', '', {}, [buf_conf])) end @d.start @p = @d.buffer assert_equal 2, @p.stage.size assert_equal 3, @p.queue.size stage = @p.stage m1 = metadata(tag: 'testing') assert_equal @worker_dir_chunk_2, stage[m1].unique_id assert_equal 4, stage[m1].size assert_equal :staged, stage[m1].state m2 = metadata(tag: 'bar') assert_equal @worker_dir_chunk_3, stage[m2].unique_id assert_equal 3, stage[m2].size assert_equal :staged, stage[m2].state queue = @p.queue assert_equal [@bufdir_chunk_1, @bufdir_chunk_2, @worker_dir_chunk_1].sort, queue.map(&:unique_id).sort assert_equal [3, 4, 4], queue.map(&:size).sort assert_equal [:queued, :queued, :queued], queue.map(&:state) end test 'worker(id=1) #resume returns staged/queued chunks with metadata, only in worker dir' do buf_conf = config_element('buffer', '', {'path' => @bufdir}) @d = FluentPluginFileSingleBufferTest::DummyOutputPlugin.new with_worker_config(workers: 2, worker_id: 1) do @d.configure(config_element('output', '', {}, [buf_conf])) end @d.start @p = @d.buffer assert_equal 2, @p.stage.size assert_equal 1, @p.queue.size stage = @p.stage m1 = metadata(tag: 'foo') assert_equal @worker_dir_chunk_2, stage[m1].unique_id assert_equal 4, stage[m1].size assert_equal :staged, stage[m1].state m2 = metadata(tag: 'baz') assert_equal @worker_dir_chunk_3, stage[m2].unique_id assert_equal 3, stage[m2].size assert_equal :staged, stage[m2].state queue = @p.queue assert_equal @worker_dir_chunk_1, queue[0].unique_id assert_equal 3, queue[0].size assert_equal :queued, queue[0].state end end sub_test_case 'there are existing broken file chunks' do setup do FileUtils.rm_rf(@bufdir) rescue nil FileUtils.mkdir_p(@bufdir) end teardown do return unless @p @p.stop unless @p.stopped? @p.before_shutdown unless @p.before_shutdown? @p.shutdown unless @p.shutdown? @p.after_shutdown unless @p.after_shutdown? @p.close unless @p.closed? @p.terminate unless @p.terminated? end test '#resume backups empty chunk' do id_output = 'backup_test' @d = create_driver(%[ @id #{id_output} @type file_single path #{PATH} ]) @p = @d.instance.buffer c1id = Fluent::UniqueId.generate p1 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(c1id)}.buf") File.open(p1, 'wb') { |f| } # create empty chunk file Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do @p.start end assert { not File.exist?(p1) } assert { File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") } end test '#resume throws away broken chunk with disable_chunk_backup' do id_output = 'backup_test' @d = create_driver(%[ @id #{id_output} @type file_single path #{PATH} disable_chunk_backup true ]) @p = @d.instance.buffer c1id = Fluent::UniqueId.generate p1 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(c1id)}.buf") File.open(p1, 'wb') { |f| } # create empty chunk file Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do @p.start end assert { not File.exist?(p1) } assert { not File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") } end end end