test/plugin/test_parser_avro.rb in fluent-plugin-parser-avro-0.2.0 vs test/plugin/test_parser_avro.rb in fluent-plugin-parser-avro-0.3.0

- old
+ new

@@ -448,9 +448,73 @@ assert_equal datum, record end end end + class AuthenticationTest < self + teardown do + @dummy_server_thread.kill + @dummy_server_thread.join + end + + setup do + @got = [] + @dummy_server_thread = Thread.new do + server = WEBrick::HTTPServer.new({:BindAddress => '127.0.0.1', :Port => AVRO_REGISTRY_PORT}) + begin + htpasswd = WEBrick::HTTPAuth::Htpasswd.new('dot.htpasswd') + htpasswd.set_passwd(nil, "API_KEY", "API_SECRET") + authenticator = WEBrick::HTTPAuth::BasicAuth.new(:UserDB => htpasswd, :Realm => "") + server.mount_proc('/') do |req, res| + authenticator.authenticate(req, res) + schema = File.read(File.join(__dir__, "..", "data", "schema-persions-value-1.avsc")) + res.body = schema + end + server.start + ensure + server.shutdown + end + end + end + + def test_authentication_failure + conf = Fluent::Config::Element.new( + '', '', {'@type' => 'avro', + 'api_key' => 'WRONG_KEY', + 'api_secret' => 'WRONG_SECRET' + }, [ + Fluent::Config::Element.new('confluent_registry', '', { + 'url' => 'http://localhost:8081', + 'subject' => 'persons-avro-value', + }, []) + ]) + assert_raise(Fluent::Plugin::ConfluentAvroSchemaRegistryUnauthorizedError) do + d = create_driver(conf) + d.instance.run + end + end + + def test_with_authentication + conf = Fluent::Config::Element.new( + '', '', {'@type' => 'avro', + 'api_key' => 'API_KEY', + 'api_secret' => 'API_SECRET' + }, [ + Fluent::Config::Element.new('confluent_registry', '', { + 'url' => 'http://localhost:8081', + 'subject' => 'persons-avro-value', + }, []) + ]) + datum = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 159202477258} + schema = Yajl.load(File.read(File.join(__dir__, "..", "data", "schema-persions-value-1.avsc"))) + encoded = encode_datum(datum, schema.fetch("schema"), true, 1) + d = create_driver(conf) + d.instance.parse(encoded) do |_time, record| + assert_equal datum, record + end + end + end + private def encode_datum(datum, string_schema, use_confluent_schema = true, schema_id = 1) buffer = StringIO.new encoder = Avro::IO::BinaryEncoder.new(buffer)