lib/fluent/plugin/parser_avro.rb in fluent-plugin-parser-avro-0.2.0 vs lib/fluent/plugin/parser_avro.rb in fluent-plugin-parser-avro-0.3.0

- old
+ new

@@ -34,10 +34,12 @@ config_param :writers_schema_file, :string, :default => nil config_param :writers_schema_json, :string, :default => nil config_param :readers_schema_file, :string, :default => nil config_param :readers_schema_json, :string, :default => nil config_param :use_confluent_schema, :bool, :default => true + config_param :api_key, :string, :default => nil + config_param :api_secret, :string, :default => nil config_section :confluent_registry, param_name: :avro_registry, required: false, multi: false do config_param :url, :string config_param :subject, :string config_param :schema_key, :string, :default => "schema" config_param :schema_version, :string, :default => "latest" @@ -68,11 +70,11 @@ @writers_schema = Avro::Schema.parse(@writers_raw_schema) @readers_schema = Avro::Schema.parse(@readers_raw_schema) @reader = Avro::IO::DatumReader.new(@writers_schema, @readers_schema) elsif @avro_registry - @confluent_registry = Fluent::Plugin::ConfluentAvroSchemaRegistry.new(@avro_registry.url) + @confluent_registry = Fluent::Plugin::ConfluentAvroSchemaRegistry.new(@avro_registry.url, @api_key, @api_secret) @raw_schema = @confluent_registry.subject_version(@avro_registry.subject, @avro_registry.schema_key, @avro_registry.schema_version) @schema = Avro::Schema.parse(@raw_schema) @reader = Avro::IO::DatumReader.new(@schema) @@ -160,10 +162,18 @@ end end def fetch_schema(url, schema_key) uri = URI.parse(url) - response = Net::HTTP.get_response(uri) + response = if @api_key and @api_secret + Net::HTTP.start(uri.host, uri.port) do |http| + request = Net::HTTP::Get.new(uri.path) + request.basic_auth(@api_key, @api_secret) + http.request(request) + end + else + Net::HTTP.get_response(uri) + end if schema_key.nil? response.body else Yajl.load(response.body)[schema_key] end