lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.10 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.11
- old
+ new
@@ -16,10 +16,12 @@
Set brokers via Zookeeper:
<zookeeper_host>:<zookeeper_port>
DESC
config_param :zookeeper_path, :string, :default => '/brokers/ids',
:desc => "Path in path for Broker id. Default to /brokers/ids"
+ config_param :schema_registry, :string, :default => nil,
+ :desc => "Set Avro Schema Registry: <schema_registry_host>:<schema_registry_port>"
config_param :default_topic, :string, :default => nil,
:desc => "Output topic"
config_param :default_message_key, :string, :default => nil
config_param :default_partition_key, :string, :default => nil
config_param :default_partition, :integer, :default => nil
@@ -190,10 +192,29 @@
require 'ltsv'
Proc.new { |tag, time, record| LTSV.dump(record) }
elsif @output_data_type == 'msgpack'
require 'msgpack'
Proc.new { |tag, time, record| record.to_msgpack }
+ elsif @output_data_type == 'avro'
+ require "avro_turf"
+ require 'avro_turf/messaging'
+ require "avro/builder"
+ Proc.new do |tag, time, record|
+ record = record.select{|key, value| key.present?}
+ record['enchilada_timestamp'] = (Time.new).strftime('%s%3N')
+ fields = record.keys.map{|key| {'name' => key, 'type' => 'string'}}
+ schema_name = "#{tag.to_s.gsub('.', '_')}"
+ schema_json = {
+ "type": "record",
+ "name": schema_name,
+ "fields": fields
+ }.to_json
+ schema = Avro::Schema.parse(schema_json)
+
+ avro = AvroTurf::Messaging.new(registry_url: @schema_registry)
+ avro.encode(record.map{|key, value| [key, value.to_s]}.to_h, schema: schema, subject: "#{schema_name}-value")
+ end
elsif @output_data_type =~ /^attr:(.*)$/
@custom_attributes = $1.split(',').map(&:strip).reject(&:empty?)
@custom_attributes.unshift('time') if @output_include_time
@custom_attributes.unshift('tag') if @output_include_tag
Proc.new { |tag, time, record|
@@ -208,11 +229,11 @@
end
end
def write(chunk)
tag = chunk.key
- def_topic = @default_topic || tag
+ def_topic = @default_topic || tag.to_s.gsub('.', '_')
producer = get_producer
records_by_topic = {}
bytes_by_topic = {}
messages = 0
@@ -275,5 +296,6 @@
refresh_client(false)
# Raise exception to retry sendind messages
raise e
end
end
+