test/plugin/test_out_tdlog.rb in fluent-plugin-td-0.11.0.rc1 vs test/plugin/test_out_tdlog.rb in fluent-plugin-td-1.0.0.rc1
- old
+ new
@@ -1,28 +1,38 @@
require 'fluent/test'
+require 'fluent/test/driver/output'
require 'fluent/plugin/out_tdlog'
require 'test_helper.rb'
class TreasureDataLogOutputTest < Test::Unit::TestCase
+ TMP_DIR = File.dirname(__FILE__) + "/tmp"
+
def setup
+ super
Fluent::Test.setup
+ FileUtils.rm_rf(TMP_DIR, secure: true)
+ FileUtils.mkdir_p(TMP_DIR)
end
- TMP_DIR = File.dirname(__FILE__) + "/tmp"
+ def teardown
+ super
+ Fluent::Engine.stop
+ end
+ BASE_CONFIG = %[
+ apikey testkey
+ buffer_path #{TMP_DIR}/buffer
+ ]
DEFAULT_CONFIG = %[
database test
table table
]
def create_driver(conf = DEFAULT_CONFIG)
- config = %[
- apikey testkey
- buffer_path #{TMP_DIR}/buffer
- ] + conf
+ config = BASE_CONFIG + conf
- Fluent::Test::BufferedOutputTestDriver.new(Fluent::TreasureDataLogOutput) do
+ Fluent::Test::Driver::Output.new(Fluent::Plugin::TreasureDataLogOutput) do
def write(chunk)
chunk.instance_variable_set(:@key, @key)
def chunk.key
@key
end
@@ -32,31 +42,47 @@
end
def test_configure
d = create_driver
- {:@apikey => 'testkey', :@use_ssl => true, :@auto_create_table => true,
- :@buffer_type => 'file', :@flush_interval => 300, :@use_gzip_command => false}.each { |k, v|
+ {:@apikey => 'testkey', :@use_ssl => true, :@auto_create_table => true, :@use_gzip_command => false}.each { |k, v|
assert_equal(d.instance.instance_variable_get(k), v)
}
+ {:@chunk_keys => ['tag'], :@flush_interval => 300, :@chunk_limit_size => Fluent::Plugin::TreasureDataLogOutput::IMPORT_SIZE_LIMIT}.each { |k, v|
+ assert_equal(d.instance.buffer.instance_variable_get(k), v)
+ }
end
- def test_emit
+ def test_configure_for_chunk_key_tag
+ assert_raise Fluent::ConfigError.new("'tag' must be included in <buffer ARG> when database and table are not specified") do
+ Fluent::Test::Driver::Output.new(Fluent::Plugin::TreasureDataLogOutput).configure(%[
+ apikey testkey
+ <buffer []>
+ flush_interval 10s
+ path #{TMP_DIR}/buffer
+ </buffer>
+ ])
+ end
+ end
+
+ data('evet_time' => 'event_time', 'int_time' => 'int')
+ def test_emit(time_class)
d = create_driver
- time, records = stub_seed_values
+ time, records = stub_seed_values(time_class)
database, table = d.instance.instance_variable_get(:@key).split(".", 2)
stub_td_table_create_request(database, table)
stub_td_import_request(stub_request_body(records, time), database, table)
assert_rr {
# mock(d.instance).gzip_by_writer(is_a(Fluent::BufferChunk), is_a(Tempfile)) causes empty request body so using dont_allow instead to check calling method
# We need actual gzipped content to verify compressed body is correct or not.
dont_allow(d.instance).gzip_by_command(anything, is_a(Tempfile))
- records.each { |record|
- d.emit(record, time)
+ d.run(default_tag: 'test') {
+ records.each { |record|
+ d.feed(time, record)
+ }
}
- d.run
}
assert_equal('TD1 testkey', @auth_header)
end
@@ -67,15 +93,15 @@
stub_td_table_create_request(database, table)
stub_td_import_request(stub_request_body(records, time), database, table)
assert_rr {
# same as test_emit
dont_allow(d.instance).gzip_by_writer(anything, is_a(Tempfile))
-
- records.each { |record|
- d.emit(record, time)
+ d.run(default_tag: 'test') {
+ records.each { |record|
+ d.feed(time, record)
+ }
}
- d.run
}
assert_equal('TD1 testkey', @auth_header)
end
@@ -86,18 +112,18 @@
records << 'string' # non-hash case
database, table = d.instance.instance_variable_get(:@key).split(".", 2)
stub_td_table_create_request(database, table)
stub_td_import_request(stub_request_body(records, time), database, table)
- records.each { |record|
- d.emit(record, time)
+ d.run(default_tag: 'test') {
+ d.feed_to_plugin('test', Fluent::ArrayEventStream.new(records.map { |e| [time, e] }))
}
- d.run
- assert !d.instance.log.logs.any? { |line|
- line =~ /undefined method/
- }, 'nil record should be skipped'
+ error_events = d.error_events(tag: 'test')
+ assert_equal 2, error_events.size
+ assert_equal nil, error_events[0][2]['record']
+ assert_equal "string", error_events[1][2]['record']
end
def test_emit_with_bigint_record
n = 100000000000000000000000
d = create_driver
@@ -109,47 +135,30 @@
stub_td_import_request(stub_request_body(records, time), database, table)
test_time, test_records = stub_seed_values
test_records[1]['k'] = ['hogehoge' * 1000]
test_records[1]['kk'] = n
- test_records.each { |record|
- d.emit(record, test_time)
+ d.run(default_tag: 'test') {
+ test_records.each { |record|
+ d.feed(test_time, record)
+ }
}
- d.run
end
- def test_emit_with_event_time
- omit "EventTime is not implemented with current Fluentd version" unless Fluent.const_defined?('EventTime')
-
- event_time_klass = Fluent.const_get('EventTime')
-
- event_time = event_time_klass.now
- d = create_driver
- _time, records = stub_seed_values
- database, table = d.instance.instance_variable_get(:@key).split(".", 2)
- stub_td_table_create_request(database, table)
- stub_td_import_request(stub_request_body(records, event_time.to_i), database, table)
-
- _test_time, test_records = stub_seed_values
- test_records.each { |record|
- d.emit(record, event_time)
- }
- d.run
- end
-
def test_emit_with_time_symbole
d = create_driver
time, records = stub_seed_values
database, table = d.instance.instance_variable_get(:@key).split(".", 2)
stub_td_table_create_request(database, table)
stub_td_import_request(stub_request_body(records, time), database, table)
- records.each { |record|
- record[:time] = Time.now.to_i # emit removes this :time key
- d.emit(record, time)
+ d.run(default_tag: 'test') {
+ records.each { |record|
+ record[:time] = Time.now.to_i # emit removes this :time key
+ d.feed(time, record)
+ }
}
- d.run
assert_equal('TD1 testkey', @auth_header)
end
def test_emit_with_endpoint
@@ -158,61 +167,70 @@
time, records = stub_seed_values
database, table = d.instance.instance_variable_get(:@key).split(".", 2)
stub_td_table_create_request(database, table, opts)
stub_td_import_request(stub_request_body(records, time), database, table, opts)
- records.each { |record|
- d.emit(record, time)
+ d.run(default_tag: 'test') {
+ records.each { |record|
+ d.feed(time, record)
+ }
}
- d.run
end
def test_emit_with_too_many_keys
d = create_driver(DEFAULT_CONFIG + "endpoint foo.bar.baz")
opts = {:endpoint => 'foo.bar.baz'}
time, records = stub_seed_values
database, table = d.instance.instance_variable_get(:@key).split(".", 2)
stub_td_table_create_request(database, table, opts)
stub_td_import_request(stub_request_body([], time), database, table, opts)
- d.emit(create_too_many_keys_record, time)
- d.run
+ d.run(default_tag: 'test') {
+ d.feed(time, create_too_many_keys_record)
+ }
- assert_equal 0, d.emits.size
- assert d.instance.log.logs.select{ |line|
- line =~ /Too many number of keys/
- }.size == 1, "too many keys error is not logged"
+ assert_equal 0, d.events.size
+ assert_equal 1, d.error_events.size
end
- # TODO: add normalized_msgpack / tag split test
+ sub_test_case 'tag splitting for database and table' do
+ def create_driver(conf = %[auto_create_table true])
+ config = BASE_CONFIG + conf
-## TODO invalid names are normalized
-# def test_invalid_name
-# d = create_driver
-# d.instance.start
-#
-# es = Fluent::OneEventStream.new(Time.now.to_i, {})
-# chain = Fluent::NullOutputChain.instance
-# assert_raise(RuntimeError) do
-# d.instance.emit("test.invalid-name", es, chain)
-# end
-# assert_raise(RuntimeError) do
-# d.instance.emit("empty", es, chain)
-# end
-# assert_raise(RuntimeError) do
-# d.instance.emit("", es, chain)
-# end
-# end
+ Fluent::Test::Driver::Output.new(Fluent::Plugin::TreasureDataLogOutput).configure(config)
+ end
-## TODO invalid data is ignored
-# def test_invalid_data
-# d = create_driver
-# d.instance.start
-#
-# es = Fluent::OneEventStream.new(Time.now.to_i, "invalid")
-# chain = Fluent::NullOutputChain.instance
-# assert_nothing_raised do
-# d.instance.emit("test.name", es, chain)
-# end
-# end
+ data('evet_time' => 'event_time', 'int_time' => 'int')
+ def test_tag_split(time_class)
+ d = create_driver
+
+ time, records = stub_seed_values(time_class)
+ database = 'db1'
+ table = 'table1'
+ stub_td_table_create_request(database, table)
+ stub_td_import_request(stub_request_body(records, time), database, table)
+
+ d.run(default_tag: 'td.db1.table1') {
+ records.each { |record|
+ d.feed(time, record)
+ }
+ }
+ end
+
+ def test_tag_split_with_normalization
+ d = create_driver
+
+ time, records = stub_seed_values
+ database = 'db_'
+ table = 'tb_'
+ stub_td_table_create_request(database, table)
+ stub_td_import_request(stub_request_body(records, time), database, table)
+
+ d.run(default_tag: 'td.db.tb') {
+ records.each { |record|
+ d.feed(time, record)
+ }
+ }
+ end
+ end
end