require "helper" require "fluent/plugin/out_mysql_fetch_and_emit.rb" class MysqlFetchAndEmitOutputTest < Test::Unit::TestCase setup do Fluent::Test.setup mysql_client.query("INSERT INTO users (id, name) VALUES (1, 'user1'), (2, 'user2'), (3, 'user3')") end teardown do mysql_client.query("TRUNCATE TABLE users") end test "mysql_connect" do results = mysql_client.query("SELECT * FROM users ORDER BY id") assert do results.to_a.map { |row| row["id"] } == [1, 2, 3] end end CONFIG = <<~CONF host 127.0.0.1 username root password "" database sample table users record_key id column_names ["id as foo", "name"] tag new_tag CONF test "configure" do driver = create_driver assert_equal("id", driver.instance.where_column_name) end sub_test_case "write" do test "single feed" do driver = create_driver driver.run do driver.feed("tag", Time.now.to_i, {"id" => "1"}) end event = driver.events[0] assert_equal("new_tag", event[0]) assert_kind_of(Fluent::EventTime, event[1]) assert_equal({"foo" => 1, "name" => "user1"}, event[2]) end test "multiple feed" do driver = create_driver driver.run do driver.feed("tag", Time.now.to_i, {"id" => 1}) driver.feed("tag", Time.now.to_i, {"id" => 3}) end event1 = driver.events[0] event2 = driver.events[1] assert_equal("new_tag", event1[0]) assert_kind_of(Fluent::EventTime, event1[1]) assert_equal({"foo" => 1, "name" => "user1"}, event1[2]) assert_equal("new_tag", event2[0]) assert_kind_of(Fluent::EventTime, event2[1]) assert_equal({"foo" => 3, "name" => "user3"}, event2[2]) end test "multiple feed with additional_condition" do driver = create_driver(CONFIG + "\n" + <<~CONF) additional_condition "name = '${tag}'" CONF driver.run do driver.feed("user1", Time.now.to_i, {"id" => 1}) driver.feed("user1", Time.now.to_i, {"id" => 3}) end assert_equal(1, driver.events.size) event1 = driver.events[0] assert_equal("new_tag", event1[0]) assert_kind_of(Fluent::EventTime, event1[1]) assert_equal({"foo" => 1, "name" => "user1"}, event1[2]) end end sub_test_case "record_matching_key" do test "mysql_record.merge(fluentd_record)" do mysql_client.query("INSERT INTO users (id, name) VALUES (4, 'user1')") driver = create_driver(CONFIG + "\n" + <<~CONF) fluentd_record_key id mysql_record_key foo CONF driver.run do driver.feed("user1", Time.now.to_i, {"id" => 1, "name" => "new_user1", "hoge" => "bar1"}) driver.feed("user1", Time.now.to_i, {"id" => 3, "hoge" => "bar3"}) driver.feed("user1", Time.now.to_i, {"id" => 4, "name" => "new_user4", "hoge" => "bar4"}) end assert_equal(3, driver.events.size) event1 = driver.events[0] event2 = driver.events[1] event3 = driver.events[2] assert_equal("new_tag", event1[0]) assert_kind_of(Fluent::EventTime, event1[1]) assert_equal({"id" => 1, "foo" => 1, "name" => "new_user1", "hoge" => "bar1"}, event1[2]) assert_equal({"id" => 3, "foo" => 3, "name" => "user3", "hoge" => "bar3"}, event2[2]) assert_equal({"id" => 4, "foo" => 4, "name" => "new_user4", "hoge" => "bar4"}, event3[2]) end test "mysql_record.merge(fluentd_record) with remove_keys" do mysql_client.query("INSERT INTO users (id, name) VALUES (4, 'user1')") driver = create_driver(CONFIG + "\n" + <<~CONF) fluentd_record_key id mysql_record_key foo remove_keys name, hoge CONF driver.run do driver.feed("user1", Time.now.to_i, {"id" => 1, "name" => "new_user1", "hoge" => "bar1"}) driver.feed("user1", Time.now.to_i, {"id" => 3, "hoge" => "bar3"}) driver.feed("user1", Time.now.to_i, {"id" => 4, "name" => "new_user4", "hoge" => "bar4"}) end assert_equal(3, driver.events.size) event1 = driver.events[0] event2 = driver.events[1] event3 = driver.events[2] assert_equal("new_tag", event1[0]) assert_kind_of(Fluent::EventTime, event1[1]) assert_equal({"id" => 1, "foo" => 1, "name" => "user1"}, event1[2]) assert_equal({"id" => 3, "foo" => 3, "name" => "user3"}, event2[2]) assert_equal({"id" => 4, "foo" => 4, "name" => "user1"}, event3[2]) end test "fluentd_record.merge(mysql_record)" do mysql_client.query("INSERT INTO users (id, name) VALUES (4, 'user1')") driver = create_driver(CONFIG + "\n" + <<~CONF) merge_priority mysql fluentd_record_key id mysql_record_key foo CONF driver.run do driver.feed("user1", Time.now.to_i, {"id" => 1, "name" => "new_user1", "hoge" => "bar1"}) driver.feed("user1", Time.now.to_i, {"id" => 3, "hoge" => "bar3"}) driver.feed("user1", Time.now.to_i, {"id" => 4, "name" => "new_user4", "hoge" => "bar4"}) end assert_equal(3, driver.events.size) event1 = driver.events[0] event2 = driver.events[1] event3 = driver.events[2] assert_equal("new_tag", event1[0]) assert_kind_of(Fluent::EventTime, event1[1]) assert_equal({"id" => 1, "foo" => 1, "name" => "user1", "hoge" => "bar1"}, event1[2]) assert_equal({"id" => 3, "foo" => 3, "name" => "user3", "hoge" => "bar3"}, event2[2]) assert_equal({"id" => 4, "foo" => 4, "name" => "user1", "hoge" => "bar4"}, event3[2]) end test "fluentd_record.merge(mysql_record) with remove_keys" do mysql_client.query("INSERT INTO users (id, name) VALUES (4, 'user1')") driver = create_driver(CONFIG + "\n" + <<~CONF) merge_priority mysql fluentd_record_key id mysql_record_key foo remove_keys name, hoge CONF driver.run do driver.feed("user1", Time.now.to_i, {"id" => 1, "name" => "new_user1", "hoge" => "bar1"}) driver.feed("user1", Time.now.to_i, {"id" => 3, "hoge" => "bar3"}) driver.feed("user1", Time.now.to_i, {"id" => 4, "name" => "new_user4", "hoge" => "bar4"}) end assert_equal(3, driver.events.size) event1 = driver.events[0] event2 = driver.events[1] event3 = driver.events[2] assert_equal("new_tag", event1[0]) assert_kind_of(Fluent::EventTime, event1[1]) assert_equal({"id" => 1, "foo" => 1, "name" => "user1"}, event1[2]) assert_equal({"id" => 3, "foo" => 3, "name" => "user3"}, event2[2]) assert_equal({"id" => 4, "foo" => 4, "name" => "user1"}, event3[2]) end end private def create_driver(conf = CONFIG) Fluent::Test::Driver::Output.new(Fluent::Plugin::MysqlFetchAndEmitOutput).configure(conf) end def mysql_client Mysql2::Client.new(host: "127.0.0.1", username: "root", password: "", database: "sample") end end