test/plugin/test_out_tdlog.rb in fluent-plugin-td-0.11.0.rc1 vs test/plugin/test_out_tdlog.rb in fluent-plugin-td-1.0.0.rc1

- old
+ new

@@ -1,28 +1,38 @@ require 'fluent/test' +require 'fluent/test/driver/output' require 'fluent/plugin/out_tdlog' require 'test_helper.rb' class TreasureDataLogOutputTest < Test::Unit::TestCase + TMP_DIR = File.dirname(__FILE__) + "/tmp" + def setup + super Fluent::Test.setup + FileUtils.rm_rf(TMP_DIR, secure: true) + FileUtils.mkdir_p(TMP_DIR) end - TMP_DIR = File.dirname(__FILE__) + "/tmp" + def teardown + super + Fluent::Engine.stop + end + BASE_CONFIG = %[ + apikey testkey + buffer_path #{TMP_DIR}/buffer + ] DEFAULT_CONFIG = %[ database test table table ] def create_driver(conf = DEFAULT_CONFIG) - config = %[ - apikey testkey - buffer_path #{TMP_DIR}/buffer - ] + conf + config = BASE_CONFIG + conf - Fluent::Test::BufferedOutputTestDriver.new(Fluent::TreasureDataLogOutput) do + Fluent::Test::Driver::Output.new(Fluent::Plugin::TreasureDataLogOutput) do def write(chunk) chunk.instance_variable_set(:@key, @key) def chunk.key @key end @@ -32,31 +42,47 @@ end def test_configure d = create_driver - {:@apikey => 'testkey', :@use_ssl => true, :@auto_create_table => true, - :@buffer_type => 'file', :@flush_interval => 300, :@use_gzip_command => false}.each { |k, v| + {:@apikey => 'testkey', :@use_ssl => true, :@auto_create_table => true, :@use_gzip_command => false}.each { |k, v| assert_equal(d.instance.instance_variable_get(k), v) } + {:@chunk_keys => ['tag'], :@flush_interval => 300, :@chunk_limit_size => Fluent::Plugin::TreasureDataLogOutput::IMPORT_SIZE_LIMIT}.each { |k, v| + assert_equal(d.instance.buffer.instance_variable_get(k), v) + } end - def test_emit + def test_configure_for_chunk_key_tag + assert_raise Fluent::ConfigError.new("'tag' must be included in <buffer ARG> when database and table are not specified") do + Fluent::Test::Driver::Output.new(Fluent::Plugin::TreasureDataLogOutput).configure(%[ + apikey testkey + <buffer []> + flush_interval 10s + path #{TMP_DIR}/buffer + </buffer> + ]) + end + end + + data('evet_time' => 'event_time', 'int_time' => 'int') + def test_emit(time_class) d = create_driver - time, records = stub_seed_values + time, records = stub_seed_values(time_class) database, table = d.instance.instance_variable_get(:@key).split(".", 2) stub_td_table_create_request(database, table) stub_td_import_request(stub_request_body(records, time), database, table) assert_rr { # mock(d.instance).gzip_by_writer(is_a(Fluent::BufferChunk), is_a(Tempfile)) causes empty request body so using dont_allow instead to check calling method # We need actual gzipped content to verify compressed body is correct or not. dont_allow(d.instance).gzip_by_command(anything, is_a(Tempfile)) - records.each { |record| - d.emit(record, time) + d.run(default_tag: 'test') { + records.each { |record| + d.feed(time, record) + } } - d.run } assert_equal('TD1 testkey', @auth_header) end @@ -67,15 +93,15 @@ stub_td_table_create_request(database, table) stub_td_import_request(stub_request_body(records, time), database, table) assert_rr { # same as test_emit dont_allow(d.instance).gzip_by_writer(anything, is_a(Tempfile)) - - records.each { |record| - d.emit(record, time) + d.run(default_tag: 'test') { + records.each { |record| + d.feed(time, record) + } } - d.run } assert_equal('TD1 testkey', @auth_header) end @@ -86,18 +112,18 @@ records << 'string' # non-hash case database, table = d.instance.instance_variable_get(:@key).split(".", 2) stub_td_table_create_request(database, table) stub_td_import_request(stub_request_body(records, time), database, table) - records.each { |record| - d.emit(record, time) + d.run(default_tag: 'test') { + d.feed_to_plugin('test', Fluent::ArrayEventStream.new(records.map { |e| [time, e] })) } - d.run - assert !d.instance.log.logs.any? { |line| - line =~ /undefined method/ - }, 'nil record should be skipped' + error_events = d.error_events(tag: 'test') + assert_equal 2, error_events.size + assert_equal nil, error_events[0][2]['record'] + assert_equal "string", error_events[1][2]['record'] end def test_emit_with_bigint_record n = 100000000000000000000000 d = create_driver @@ -109,47 +135,30 @@ stub_td_import_request(stub_request_body(records, time), database, table) test_time, test_records = stub_seed_values test_records[1]['k'] = ['hogehoge' * 1000] test_records[1]['kk'] = n - test_records.each { |record| - d.emit(record, test_time) + d.run(default_tag: 'test') { + test_records.each { |record| + d.feed(test_time, record) + } } - d.run end - def test_emit_with_event_time - omit "EventTime is not implemented with current Fluentd version" unless Fluent.const_defined?('EventTime') - - event_time_klass = Fluent.const_get('EventTime') - - event_time = event_time_klass.now - d = create_driver - _time, records = stub_seed_values - database, table = d.instance.instance_variable_get(:@key).split(".", 2) - stub_td_table_create_request(database, table) - stub_td_import_request(stub_request_body(records, event_time.to_i), database, table) - - _test_time, test_records = stub_seed_values - test_records.each { |record| - d.emit(record, event_time) - } - d.run - end - def test_emit_with_time_symbole d = create_driver time, records = stub_seed_values database, table = d.instance.instance_variable_get(:@key).split(".", 2) stub_td_table_create_request(database, table) stub_td_import_request(stub_request_body(records, time), database, table) - records.each { |record| - record[:time] = Time.now.to_i # emit removes this :time key - d.emit(record, time) + d.run(default_tag: 'test') { + records.each { |record| + record[:time] = Time.now.to_i # emit removes this :time key + d.feed(time, record) + } } - d.run assert_equal('TD1 testkey', @auth_header) end def test_emit_with_endpoint @@ -158,61 +167,70 @@ time, records = stub_seed_values database, table = d.instance.instance_variable_get(:@key).split(".", 2) stub_td_table_create_request(database, table, opts) stub_td_import_request(stub_request_body(records, time), database, table, opts) - records.each { |record| - d.emit(record, time) + d.run(default_tag: 'test') { + records.each { |record| + d.feed(time, record) + } } - d.run end def test_emit_with_too_many_keys d = create_driver(DEFAULT_CONFIG + "endpoint foo.bar.baz") opts = {:endpoint => 'foo.bar.baz'} time, records = stub_seed_values database, table = d.instance.instance_variable_get(:@key).split(".", 2) stub_td_table_create_request(database, table, opts) stub_td_import_request(stub_request_body([], time), database, table, opts) - d.emit(create_too_many_keys_record, time) - d.run + d.run(default_tag: 'test') { + d.feed(time, create_too_many_keys_record) + } - assert_equal 0, d.emits.size - assert d.instance.log.logs.select{ |line| - line =~ /Too many number of keys/ - }.size == 1, "too many keys error is not logged" + assert_equal 0, d.events.size + assert_equal 1, d.error_events.size end - # TODO: add normalized_msgpack / tag split test + sub_test_case 'tag splitting for database and table' do + def create_driver(conf = %[auto_create_table true]) + config = BASE_CONFIG + conf -## TODO invalid names are normalized -# def test_invalid_name -# d = create_driver -# d.instance.start -# -# es = Fluent::OneEventStream.new(Time.now.to_i, {}) -# chain = Fluent::NullOutputChain.instance -# assert_raise(RuntimeError) do -# d.instance.emit("test.invalid-name", es, chain) -# end -# assert_raise(RuntimeError) do -# d.instance.emit("empty", es, chain) -# end -# assert_raise(RuntimeError) do -# d.instance.emit("", es, chain) -# end -# end + Fluent::Test::Driver::Output.new(Fluent::Plugin::TreasureDataLogOutput).configure(config) + end -## TODO invalid data is ignored -# def test_invalid_data -# d = create_driver -# d.instance.start -# -# es = Fluent::OneEventStream.new(Time.now.to_i, "invalid") -# chain = Fluent::NullOutputChain.instance -# assert_nothing_raised do -# d.instance.emit("test.name", es, chain) -# end -# end + data('evet_time' => 'event_time', 'int_time' => 'int') + def test_tag_split(time_class) + d = create_driver + + time, records = stub_seed_values(time_class) + database = 'db1' + table = 'table1' + stub_td_table_create_request(database, table) + stub_td_import_request(stub_request_body(records, time), database, table) + + d.run(default_tag: 'td.db1.table1') { + records.each { |record| + d.feed(time, record) + } + } + end + + def test_tag_split_with_normalization + d = create_driver + + time, records = stub_seed_values + database = 'db_' + table = 'tb_' + stub_td_table_create_request(database, table) + stub_td_import_request(stub_request_body(records, time), database, table) + + d.run(default_tag: 'td.db.tb') { + records.each { |record| + d.feed(time, record) + } + } + end + end end