test/plugin/test_parser_avro.rb in fluent-plugin-parser-avro-0.2.0 vs test/plugin/test_parser_avro.rb in fluent-plugin-parser-avro-0.3.0
- old
+ new
@@ -448,9 +448,73 @@
assert_equal datum, record
end
end
end
+ class AuthenticationTest < self
+ teardown do
+ @dummy_server_thread.kill
+ @dummy_server_thread.join
+ end
+
+ setup do
+ @got = []
+ @dummy_server_thread = Thread.new do
+ server = WEBrick::HTTPServer.new({:BindAddress => '127.0.0.1', :Port => AVRO_REGISTRY_PORT})
+ begin
+ htpasswd = WEBrick::HTTPAuth::Htpasswd.new('dot.htpasswd')
+ htpasswd.set_passwd(nil, "API_KEY", "API_SECRET")
+ authenticator = WEBrick::HTTPAuth::BasicAuth.new(:UserDB => htpasswd, :Realm => "")
+ server.mount_proc('/') do |req, res|
+ authenticator.authenticate(req, res)
+ schema = File.read(File.join(__dir__, "..", "data", "schema-persions-value-1.avsc"))
+ res.body = schema
+ end
+ server.start
+ ensure
+ server.shutdown
+ end
+ end
+ end
+
+ def test_authentication_failure
+ conf = Fluent::Config::Element.new(
+ '', '', {'@type' => 'avro',
+ 'api_key' => 'WRONG_KEY',
+ 'api_secret' => 'WRONG_SECRET'
+ }, [
+ Fluent::Config::Element.new('confluent_registry', '', {
+ 'url' => 'http://localhost:8081',
+ 'subject' => 'persons-avro-value',
+ }, [])
+ ])
+ assert_raise(Fluent::Plugin::ConfluentAvroSchemaRegistryUnauthorizedError) do
+ d = create_driver(conf)
+ d.instance.run
+ end
+ end
+
+ def test_with_authentication
+ conf = Fluent::Config::Element.new(
+ '', '', {'@type' => 'avro',
+ 'api_key' => 'API_KEY',
+ 'api_secret' => 'API_SECRET'
+ }, [
+ Fluent::Config::Element.new('confluent_registry', '', {
+ 'url' => 'http://localhost:8081',
+ 'subject' => 'persons-avro-value',
+ }, [])
+ ])
+ datum = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 159202477258}
+ schema = Yajl.load(File.read(File.join(__dir__, "..", "data", "schema-persions-value-1.avsc")))
+ encoded = encode_datum(datum, schema.fetch("schema"), true, 1)
+ d = create_driver(conf)
+ d.instance.parse(encoded) do |_time, record|
+ assert_equal datum, record
+ end
+ end
+ end
+
private
def encode_datum(datum, string_schema, use_confluent_schema = true, schema_id = 1)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)