test/plugin/test_parser_avro.rb in fluent-plugin-parser-avro-0.1.0 vs test/plugin/test_parser_avro.rb in fluent-plugin-parser-avro-0.2.0
- old
+ new
@@ -74,78 +74,98 @@
}
} ]
}
EOC
- def test_parse
+ data("use_confluent_schema" => true,
+ "plain" => false)
+ def test_parse(data)
+ config = data
conf = {
- 'schema_json' => SCHEMA
+ 'schema_json' => SCHEMA,
+ 'use_confluent_schema' => config,
}
d = create_driver(conf)
datum = {"username" => "foo", "age" => 42, "verified" => true}
- encoded = encode_datum(datum, SCHEMA)
+ encoded = encode_datum(datum, SCHEMA, config)
d.instance.parse(encoded) do |_time, record|
assert_equal datum, record
end
datum = {"username" => "baz", "age" => 34}
- encoded = encode_datum(datum, SCHEMA)
+ encoded = encode_datum(datum, SCHEMA, config)
d.instance.parse(encoded) do |_time, record|
assert_equal datum.merge("verified" => nil), record
end
end
- def test_parse_with_avro_schema
+ data("use_confluent_schema" => true,
+ "plain" => false)
+ def test_parse_with_avro_schema(data)
+ config = data
conf = {
- 'schema_file' => File.join(__dir__, "..", "data", "user.avsc")
+ 'schema_file' => File.join(__dir__, "..", "data", "user.avsc"),
+ 'use_confluent_schema' => config,
}
d = create_driver(conf)
datum = {"username" => "foo", "age" => 42, "verified" => true}
- encoded = encode_datum(datum, SCHEMA)
+ encoded = encode_datum(datum, SCHEMA, config)
d.instance.parse(encoded) do |_time, record|
assert_equal datum, record
end
datum = {"username" => "baz", "age" => 34}
- encoded = encode_datum(datum, SCHEMA)
+ encoded = encode_datum(datum, SCHEMA, config)
d.instance.parse(encoded) do |_time, record|
assert_equal datum.merge("verified" => nil), record
end
end
- def test_parse_with_readers_and_writers_schema
+ data("use_confluent_schema" => true,
+ "plain" => false)
+ def test_parse_with_readers_and_writers_schema(data)
+ config = data
conf = {
'writers_schema_json' => SCHEMA,
'readers_schema_json' => READERS_SCHEMA,
+ 'use_confluent_schema' => config,
}
d = create_driver(conf)
datum = {"username" => "foo", "age" => 42, "verified" => true}
- encoded = encode_datum(datum, SCHEMA)
+ encoded = encode_datum(datum, SCHEMA, config)
d.instance.parse(encoded) do |_time, record|
datum.delete("verified")
assert_equal datum, record
end
end
- def test_parse_with_readers_and_writers_schema_files
+ data("use_confluent_schema" => true,
+ "plain" => false)
+ def test_parse_with_readers_and_writers_schema_files(data)
+ config = data
conf = {
'writers_schema_file' => File.join(__dir__, "..", "data", "writer_user.avsc"),
'readers_schema_file' => File.join(__dir__, "..", "data", "reader_user.avsc"),
+ 'use_confluent_schema' => config,
}
d = create_driver(conf)
datum = {"username" => "foo", "age" => 42, "verified" => true}
- encoded = encode_datum(datum, SCHEMA)
+ encoded = encode_datum(datum, SCHEMA, config)
d.instance.parse(encoded) do |_time, record|
datum.delete("verified")
assert_equal datum, record
end
end
- def test_parse_with_complex_schema
+ data("use_confluent_schema" => true,
+ "plain" => false)
+ def test_parse_with_complex_schema(data)
+ config = data
conf = {
'schema_json' => COMPLEX_SCHEMA,
- 'time_key' => 'time'
+ 'time_key' => 'time',
+ 'use_confluent_schema' => config,
}
d = create_driver(conf)
time_str = "2020-09-25 15:08:09.082113 +0900"
datum = {
"time" => time_str,
@@ -160,11 +180,11 @@
"size" => 36,
"hidden" => false
}
}
- encoded = encode_datum(datum, COMPLEX_SCHEMA)
+ encoded = encode_datum(datum, COMPLEX_SCHEMA, config)
d.instance.parse(encoded) do |time, record|
assert_equal Time.parse(time_str).to_r, time.to_r
datum.delete("time")
assert_equal datum, record
end
@@ -183,10 +203,26 @@
begin
server.mount_proc('/') do |req,res|
res.status = 200
res.body = 'running'
end
+ server.mount_proc("/schemas/ids") do |req, res|
+ req.path =~ /^\/schemas\/ids\/([^\/]*)$/
+ version = $1
+ @got.push({
+ version: version,
+ })
+ if version == "1"
+ res.body = File.read(File.join(__dir__, "..", "data", "schema-persions-value-1.avsc"))
+ elsif version == "21"
+ res.body = File.read(File.join(__dir__, "..", "data", "schema-persions-value-21.avsc"))
+ elsif version == "41"
+ res.body = File.read(File.join(__dir__, "..", "data", "schema-persions-value-41.avsc"))
+ elsif version == "42"
+ res.body = File.read(File.join(__dir__, "..", "data", "schema-persions-value-42.avsc"))
+ end
+ end
server.mount_proc("/subjects") do |req, res|
req.path =~ /^\/subjects\/([^\/]*)\/([^\/]*)\/(.*)$/
avro_registered_name = $1
version = $3
@got.push({
@@ -202,10 +238,12 @@
res.body = File.read(File.join(__dir__, "..", "data", "persons-avro-value2.avsc"))
elsif version == "3"
res.body = File.read(File.join(__dir__, "..", "data", "persons-avro-value3.avsc"))
elsif version == "4"
res.body = File.read(File.join(__dir__, "..", "data", "persons-avro-value4.avsc"))
+ elsif version == "latest"
+ res.body = File.read(File.join(__dir__, "..", "data", "persons-avro-value4.avsc"))
end
end
server.start
ensure
server.shutdown
@@ -316,66 +354,112 @@
assert_equal '200', client.request_get('/subjects/persons-avro-value/versions/3').code
assert_equal 4, @got.size
assert_equal 'persons-avro-value', @got[3][:registered_name]
assert_equal '3', @got[3][:version]
+
+ assert_equal '200', client.request_get('/schemas/ids/1').code
+ assert_equal 5, @got.size
+ assert_nil @got[4][:registered_name]
+ assert_equal '1', @got[4][:version]
+
+ assert_equal '200', client.request_get('/schemas/ids/21').code
+ assert_equal 6, @got.size
+ assert_nil @got[5][:registered_name]
+ assert_equal '21', @got[5][:version]
+
+ assert_equal '200', client.request_get('/schemas/ids/41').code
+ assert_equal 7, @got.size
+ assert_nil @got[6][:registered_name]
+ assert_equal '41', @got[6][:version]
+
+ assert_equal '200', client.request_get('/schemas/ids/42').code
+ assert_equal 8, @got.size
+ assert_nil @got[7][:registered_name]
+ assert_equal '42', @got[7][:version]
end
- def test_schema_url
+ data("use_confluent_schema" => true,
+ "plain" => false)
+ def test_schema_url(data)
+ config = data
conf = {
'schema_url' => "http://localhost:8081/subjects/persons-avro-value/versions/1",
- 'schema_url_key' => 'schema'
+ 'schema_url_key' => 'schema',
+ 'use_confluent_schema' => config,
}
d = create_driver(conf)
datum = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 159202477258}
- encoded = encode_datum(datum, REMOTE_SCHEMA)
+ encoded = encode_datum(datum, REMOTE_SCHEMA, config)
d.instance.parse(encoded) do |_time, record|
assert_equal datum, record
end
end
- def test_schema_url_with_version2
+ data("use_confluent_schema" => true,
+ "plain" => false)
+ def test_schema_url_with_version2(data)
+ config = data
conf = {
'schema_url' => "http://localhost:8081/subjects/persons-avro-value/versions/2",
- 'schema_url_key' => 'schema'
+ 'schema_url_key' => 'schema',
+ 'use_confluent_schema' => config,
}
d = create_driver(conf)
datum = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 159202477258}
- encoded = encode_datum(datum, REMOTE_SCHEMA2)
+ encoded = encode_datum(datum, REMOTE_SCHEMA2, config)
d.instance.parse(encoded) do |_time, record|
assert_equal datum.merge("verified" => false), record
end
end
- def test_schema_registery_with_subject_url
- conf = {
- 'schema_registery_with_subject_url' => "http://localhost:8081/subjects/persons-avro-value/",
- 'schema_url_key' => 'schema'
- }
+ def test_confluent_registry_with_schema_version
+ conf = Fluent::Config::Element.new(
+ '', '', {'@type' => 'avro'}, [
+ Fluent::Config::Element.new('confluent_registry', '', {
+ 'url' => 'http://localhost:8081',
+ 'subject' => 'persons-avro-value',
+ 'schema_key' => 'schema',
+ 'schema_version' => '1',
+ }, [])
+ ])
d = create_driver(conf)
datum = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 159202477258}
- encoded = encode_datum(datum, REMOTE_SCHEMA2)
+ schema = Yajl.load(File.read(File.join(__dir__, "..", "data", "schema-persions-value-1.avsc")))
+ encoded = encode_datum(datum, schema.fetch("schema"), true, 1)
d.instance.parse(encoded) do |_time, record|
- assert_equal datum.merge("verified" => nil), record
+ assert_equal datum, record
end
end
- def test_schema_registery_with_invalid_subject_url
- conf = {
- 'schema_registery_with_subject_url' => "http://localhost:8081/subjects/persons-avro-value",
- 'schema_url_key' => 'schema'
- }
- assert_raise(Fluent::ConfigError) do
- create_driver(conf)
+ def test_confluent_registry_with_fallback
+ conf = Fluent::Config::Element.new(
+ '', '', {'@type' => 'avro'}, [
+ Fluent::Config::Element.new('confluent_registry', '', {
+ 'url' => 'http://localhost:8081',
+ 'subject' => 'persons-avro-value',
+ 'schema_key' => 'schema',
+ }, [])
+ ])
+ d = create_driver(conf)
+ datum = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 159202477258}
+ schema = Yajl.load(File.read(File.join(__dir__, "..", "data", "schema-persions-value-1.avsc")))
+ encoded = encode_datum(datum, schema.fetch("schema"), true, 1)
+ d.instance.parse(encoded) do |_time, record|
+ assert_equal datum, record
end
end
end
private
- def encode_datum(datum, string_schema)
+ def encode_datum(datum, string_schema, use_confluent_schema = true, schema_id = 1)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
+ if use_confluent_schema
+ encoder.write(Fluent::Plugin::AvroParser::MAGIC_BYTE)
+ encoder.write([schema_id].pack("N"))
+ end
schema = Avro::Schema.parse(string_schema)
writer = Avro::IO::DatumWriter.new(schema)
writer.write(datum, encoder)
buffer.rewind
buffer.read