test/plugin/test_out_kafka2.rb in fluent-plugin-kafka-0.17.0 vs test/plugin/test_out_kafka2.rb in fluent-plugin-kafka-0.17.1

- old
+ new

@@ -1,8 +1,10 @@ require 'helper' require 'fluent/test/helpers' -require 'fluent/output' +require 'fluent/test/driver/input' +require 'fluent/test/driver/output' +require 'securerandom' class Kafka2OutputTest < Test::Unit::TestCase include Fluent::Test::Helpers def setup @@ -13,12 +15,12 @@ config_element('ROOT', '', {"@type" => "kafka2"}, [ config_element('format', "", {"@type" => "json"}) ]) end - def config - base_config + config_element('ROOT', '', {"default_topic" => "kitagawakeiko", + def config(default_topic: "kitagawakeiko") + base_config + config_element('ROOT', '', {"default_topic" => default_topic, "brokers" => "localhost:9092"}, [ ]) end def create_driver(conf = config, tag='test') @@ -54,7 +56,61 @@ end def test_mutli_worker_support d = create_driver assert_equal true, d.instance.multi_workers_ready? + end + + class WriteTest < self + TOPIC_NAME = "kafka-output-#{SecureRandom.uuid}" + + INPUT_CONFIG = %[ + @type kafka + brokers localhost:9092 + format json + @label @kafka + topics #{TOPIC_NAME} + ] + + def create_target_driver(conf = INPUT_CONFIG) + Fluent::Test::Driver::Input.new(Fluent::KafkaInput).configure(conf) + end + + def setup + @kafka = Kafka.new(["localhost:9092"], client_id: 'kafka') + end + + def teardown + @kafka.delete_topic(TOPIC_NAME) + @kafka.close + end + + def test_write + target_driver = create_target_driver + expected_message = {"a" => 2} + target_driver.run(expect_records: 1, timeout: 5) do + sleep 2 + d = create_driver(config(default_topic: TOPIC_NAME)) + d.run do + d.feed("test", event_time, expected_message) + end + end + actual_messages = target_driver.events.collect { |event| event[2] } + assert_equal([expected_message], actual_messages) + end + + def test_exclude_fields + conf = config(default_topic: TOPIC_NAME) + + config_element('ROOT', '', {"exclude_fields" => "$.foo"}, []) + target_driver = create_target_driver + target_driver.run(expect_records: 1, timeout: 5) do + sleep 2 + d = create_driver(conf) + d.run do + d.feed('test', event_time, {'a' => 'b', 'foo' => 'bar', 'message' => 'test'}) + end + end + actual_messages = target_driver.events.collect { |event| event[2] } + assert_equal([{'a' => 'b', 'message' => 'test'}], actual_messages) + end end end