lib/fluent/plugin/parser_avro.rb in fluent-plugin-parser-avro-0.2.0 vs lib/fluent/plugin/parser_avro.rb in fluent-plugin-parser-avro-0.3.0
- old
+ new
@@ -34,10 +34,12 @@
config_param :writers_schema_file, :string, :default => nil
config_param :writers_schema_json, :string, :default => nil
config_param :readers_schema_file, :string, :default => nil
config_param :readers_schema_json, :string, :default => nil
config_param :use_confluent_schema, :bool, :default => true
+ config_param :api_key, :string, :default => nil
+ config_param :api_secret, :string, :default => nil
config_section :confluent_registry, param_name: :avro_registry, required: false, multi: false do
config_param :url, :string
config_param :subject, :string
config_param :schema_key, :string, :default => "schema"
config_param :schema_version, :string, :default => "latest"
@@ -68,11 +70,11 @@
@writers_schema = Avro::Schema.parse(@writers_raw_schema)
@readers_schema = Avro::Schema.parse(@readers_raw_schema)
@reader = Avro::IO::DatumReader.new(@writers_schema, @readers_schema)
elsif @avro_registry
- @confluent_registry = Fluent::Plugin::ConfluentAvroSchemaRegistry.new(@avro_registry.url)
+ @confluent_registry = Fluent::Plugin::ConfluentAvroSchemaRegistry.new(@avro_registry.url, @api_key, @api_secret)
@raw_schema = @confluent_registry.subject_version(@avro_registry.subject,
@avro_registry.schema_key,
@avro_registry.schema_version)
@schema = Avro::Schema.parse(@raw_schema)
@reader = Avro::IO::DatumReader.new(@schema)
@@ -160,10 +162,18 @@
end
end
def fetch_schema(url, schema_key)
uri = URI.parse(url)
- response = Net::HTTP.get_response(uri)
+ response = if @api_key and @api_secret
+ Net::HTTP.start(uri.host, uri.port) do |http|
+ request = Net::HTTP::Get.new(uri.path)
+ request.basic_auth(@api_key, @api_secret)
+ http.request(request)
+ end
+ else
+ Net::HTTP.get_response(uri)
+ end
if schema_key.nil?
response.body
else
Yajl.load(response.body)[schema_key]
end