test/plugin/test_parser_avro.rb in fluent-plugin-parser-avro-0.1.0 vs test/plugin/test_parser_avro.rb in fluent-plugin-parser-avro-0.2.0

- old
+ new

@@ -74,78 +74,98 @@ } } ] } EOC - def test_parse + data("use_confluent_schema" => true, + "plain" => false) + def test_parse(data) + config = data conf = { - 'schema_json' => SCHEMA + 'schema_json' => SCHEMA, + 'use_confluent_schema' => config, } d = create_driver(conf) datum = {"username" => "foo", "age" => 42, "verified" => true} - encoded = encode_datum(datum, SCHEMA) + encoded = encode_datum(datum, SCHEMA, config) d.instance.parse(encoded) do |_time, record| assert_equal datum, record end datum = {"username" => "baz", "age" => 34} - encoded = encode_datum(datum, SCHEMA) + encoded = encode_datum(datum, SCHEMA, config) d.instance.parse(encoded) do |_time, record| assert_equal datum.merge("verified" => nil), record end end - def test_parse_with_avro_schema + data("use_confluent_schema" => true, + "plain" => false) + def test_parse_with_avro_schema(data) + config = data conf = { - 'schema_file' => File.join(__dir__, "..", "data", "user.avsc") + 'schema_file' => File.join(__dir__, "..", "data", "user.avsc"), + 'use_confluent_schema' => config, } d = create_driver(conf) datum = {"username" => "foo", "age" => 42, "verified" => true} - encoded = encode_datum(datum, SCHEMA) + encoded = encode_datum(datum, SCHEMA, config) d.instance.parse(encoded) do |_time, record| assert_equal datum, record end datum = {"username" => "baz", "age" => 34} - encoded = encode_datum(datum, SCHEMA) + encoded = encode_datum(datum, SCHEMA, config) d.instance.parse(encoded) do |_time, record| assert_equal datum.merge("verified" => nil), record end end - def test_parse_with_readers_and_writers_schema + data("use_confluent_schema" => true, + "plain" => false) + def test_parse_with_readers_and_writers_schema(data) + config = data conf = { 'writers_schema_json' => SCHEMA, 'readers_schema_json' => READERS_SCHEMA, + 'use_confluent_schema' => config, } d = create_driver(conf) datum = {"username" => "foo", "age" => 42, "verified" => true} - encoded = encode_datum(datum, SCHEMA) + encoded = encode_datum(datum, SCHEMA, config) d.instance.parse(encoded) do |_time, record| datum.delete("verified") assert_equal datum, record end end - def test_parse_with_readers_and_writers_schema_files + data("use_confluent_schema" => true, + "plain" => false) + def test_parse_with_readers_and_writers_schema_files(data) + config = data conf = { 'writers_schema_file' => File.join(__dir__, "..", "data", "writer_user.avsc"), 'readers_schema_file' => File.join(__dir__, "..", "data", "reader_user.avsc"), + 'use_confluent_schema' => config, } d = create_driver(conf) datum = {"username" => "foo", "age" => 42, "verified" => true} - encoded = encode_datum(datum, SCHEMA) + encoded = encode_datum(datum, SCHEMA, config) d.instance.parse(encoded) do |_time, record| datum.delete("verified") assert_equal datum, record end end - def test_parse_with_complex_schema + data("use_confluent_schema" => true, + "plain" => false) + def test_parse_with_complex_schema(data) + config = data conf = { 'schema_json' => COMPLEX_SCHEMA, - 'time_key' => 'time' + 'time_key' => 'time', + 'use_confluent_schema' => config, } d = create_driver(conf) time_str = "2020-09-25 15:08:09.082113 +0900" datum = { "time" => time_str, @@ -160,11 +180,11 @@ "size" => 36, "hidden" => false } } - encoded = encode_datum(datum, COMPLEX_SCHEMA) + encoded = encode_datum(datum, COMPLEX_SCHEMA, config) d.instance.parse(encoded) do |time, record| assert_equal Time.parse(time_str).to_r, time.to_r datum.delete("time") assert_equal datum, record end @@ -183,10 +203,26 @@ begin server.mount_proc('/') do |req,res| res.status = 200 res.body = 'running' end + server.mount_proc("/schemas/ids") do |req, res| + req.path =~ /^\/schemas\/ids\/([^\/]*)$/ + version = $1 + @got.push({ + version: version, + }) + if version == "1" + res.body = File.read(File.join(__dir__, "..", "data", "schema-persions-value-1.avsc")) + elsif version == "21" + res.body = File.read(File.join(__dir__, "..", "data", "schema-persions-value-21.avsc")) + elsif version == "41" + res.body = File.read(File.join(__dir__, "..", "data", "schema-persions-value-41.avsc")) + elsif version == "42" + res.body = File.read(File.join(__dir__, "..", "data", "schema-persions-value-42.avsc")) + end + end server.mount_proc("/subjects") do |req, res| req.path =~ /^\/subjects\/([^\/]*)\/([^\/]*)\/(.*)$/ avro_registered_name = $1 version = $3 @got.push({ @@ -202,10 +238,12 @@ res.body = File.read(File.join(__dir__, "..", "data", "persons-avro-value2.avsc")) elsif version == "3" res.body = File.read(File.join(__dir__, "..", "data", "persons-avro-value3.avsc")) elsif version == "4" res.body = File.read(File.join(__dir__, "..", "data", "persons-avro-value4.avsc")) + elsif version == "latest" + res.body = File.read(File.join(__dir__, "..", "data", "persons-avro-value4.avsc")) end end server.start ensure server.shutdown @@ -316,66 +354,112 @@ assert_equal '200', client.request_get('/subjects/persons-avro-value/versions/3').code assert_equal 4, @got.size assert_equal 'persons-avro-value', @got[3][:registered_name] assert_equal '3', @got[3][:version] + + assert_equal '200', client.request_get('/schemas/ids/1').code + assert_equal 5, @got.size + assert_nil @got[4][:registered_name] + assert_equal '1', @got[4][:version] + + assert_equal '200', client.request_get('/schemas/ids/21').code + assert_equal 6, @got.size + assert_nil @got[5][:registered_name] + assert_equal '21', @got[5][:version] + + assert_equal '200', client.request_get('/schemas/ids/41').code + assert_equal 7, @got.size + assert_nil @got[6][:registered_name] + assert_equal '41', @got[6][:version] + + assert_equal '200', client.request_get('/schemas/ids/42').code + assert_equal 8, @got.size + assert_nil @got[7][:registered_name] + assert_equal '42', @got[7][:version] end - def test_schema_url + data("use_confluent_schema" => true, + "plain" => false) + def test_schema_url(data) + config = data conf = { 'schema_url' => "http://localhost:8081/subjects/persons-avro-value/versions/1", - 'schema_url_key' => 'schema' + 'schema_url_key' => 'schema', + 'use_confluent_schema' => config, } d = create_driver(conf) datum = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 159202477258} - encoded = encode_datum(datum, REMOTE_SCHEMA) + encoded = encode_datum(datum, REMOTE_SCHEMA, config) d.instance.parse(encoded) do |_time, record| assert_equal datum, record end end - def test_schema_url_with_version2 + data("use_confluent_schema" => true, + "plain" => false) + def test_schema_url_with_version2(data) + config = data conf = { 'schema_url' => "http://localhost:8081/subjects/persons-avro-value/versions/2", - 'schema_url_key' => 'schema' + 'schema_url_key' => 'schema', + 'use_confluent_schema' => config, } d = create_driver(conf) datum = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 159202477258} - encoded = encode_datum(datum, REMOTE_SCHEMA2) + encoded = encode_datum(datum, REMOTE_SCHEMA2, config) d.instance.parse(encoded) do |_time, record| assert_equal datum.merge("verified" => false), record end end - def test_schema_registery_with_subject_url - conf = { - 'schema_registery_with_subject_url' => "http://localhost:8081/subjects/persons-avro-value/", - 'schema_url_key' => 'schema' - } + def test_confluent_registry_with_schema_version + conf = Fluent::Config::Element.new( + '', '', {'@type' => 'avro'}, [ + Fluent::Config::Element.new('confluent_registry', '', { + 'url' => 'http://localhost:8081', + 'subject' => 'persons-avro-value', + 'schema_key' => 'schema', + 'schema_version' => '1', + }, []) + ]) d = create_driver(conf) datum = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 159202477258} - encoded = encode_datum(datum, REMOTE_SCHEMA2) + schema = Yajl.load(File.read(File.join(__dir__, "..", "data", "schema-persions-value-1.avsc"))) + encoded = encode_datum(datum, schema.fetch("schema"), true, 1) d.instance.parse(encoded) do |_time, record| - assert_equal datum.merge("verified" => nil), record + assert_equal datum, record end end - def test_schema_registery_with_invalid_subject_url - conf = { - 'schema_registery_with_subject_url' => "http://localhost:8081/subjects/persons-avro-value", - 'schema_url_key' => 'schema' - } - assert_raise(Fluent::ConfigError) do - create_driver(conf) + def test_confluent_registry_with_fallback + conf = Fluent::Config::Element.new( + '', '', {'@type' => 'avro'}, [ + Fluent::Config::Element.new('confluent_registry', '', { + 'url' => 'http://localhost:8081', + 'subject' => 'persons-avro-value', + 'schema_key' => 'schema', + }, []) + ]) + d = create_driver(conf) + datum = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 159202477258} + schema = Yajl.load(File.read(File.join(__dir__, "..", "data", "schema-persions-value-1.avsc"))) + encoded = encode_datum(datum, schema.fetch("schema"), true, 1) + d.instance.parse(encoded) do |_time, record| + assert_equal datum, record end end end private - def encode_datum(datum, string_schema) + def encode_datum(datum, string_schema, use_confluent_schema = true, schema_id = 1) buffer = StringIO.new encoder = Avro::IO::BinaryEncoder.new(buffer) + if use_confluent_schema + encoder.write(Fluent::Plugin::AvroParser::MAGIC_BYTE) + encoder.write([schema_id].pack("N")) + end schema = Avro::Schema.parse(string_schema) writer = Avro::IO::DatumWriter.new(schema) writer.write(datum, encoder) buffer.rewind buffer.read