lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.10 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.11

- old
+ new

@@ -16,10 +16,12 @@ Set brokers via Zookeeper: <zookeeper_host>:<zookeeper_port> DESC config_param :zookeeper_path, :string, :default => '/brokers/ids', :desc => "Path in path for Broker id. Default to /brokers/ids" + config_param :schema_registry, :string, :default => nil, + :desc => "Set Avro Schema Registry: <schema_registry_host>:<schema_registry_port>" config_param :default_topic, :string, :default => nil, :desc => "Output topic" config_param :default_message_key, :string, :default => nil config_param :default_partition_key, :string, :default => nil config_param :default_partition, :integer, :default => nil @@ -190,10 +192,29 @@ require 'ltsv' Proc.new { |tag, time, record| LTSV.dump(record) } elsif @output_data_type == 'msgpack' require 'msgpack' Proc.new { |tag, time, record| record.to_msgpack } + elsif @output_data_type == 'avro' + require "avro_turf" + require 'avro_turf/messaging' + require "avro/builder" + Proc.new do |tag, time, record| + record = record.select{|key, value| key.present?} + record['enchilada_timestamp'] = (Time.new).strftime('%s%3N') + fields = record.keys.map{|key| {'name' => key, 'type' => 'string'}} + schema_name = "#{tag.to_s.gsub('.', '_')}" + schema_json = { + "type": "record", + "name": schema_name, + "fields": fields + }.to_json + schema = Avro::Schema.parse(schema_json) + + avro = AvroTurf::Messaging.new(registry_url: @schema_registry) + avro.encode(record.map{|key, value| [key, value.to_s]}.to_h, schema: schema, subject: "#{schema_name}-value") + end elsif @output_data_type =~ /^attr:(.*)$/ @custom_attributes = $1.split(',').map(&:strip).reject(&:empty?) @custom_attributes.unshift('time') if @output_include_time @custom_attributes.unshift('tag') if @output_include_tag Proc.new { |tag, time, record| @@ -208,11 +229,11 @@ end end def write(chunk) tag = chunk.key - def_topic = @default_topic || tag + def_topic = @default_topic || tag.to_s.gsub('.', '_') producer = get_producer records_by_topic = {} bytes_by_topic = {} messages = 0 @@ -275,5 +296,6 @@ refresh_client(false) # Raise exception to retry sendind messages raise e end end +