lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.13 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.14
- old
+ new
@@ -197,14 +197,14 @@
elsif @output_data_type == 'avro'
require "avro_turf"
require 'avro_turf/messaging'
require "avro/builder"
Proc.new do |tag, time, record|
- record = record.select{|key, value| key.present?}
+ record = record.select{|key, value| key.present?}.map{|k, v| [k.tr('[]-', '_').delete('$'), v.to_s]}.to_h
record['enchilada_timestamp'] = (Time.new).strftime('%s%3N')
fields = record.keys.map{|key| {'name' => key, 'type' => 'string'}}
- schema_name = "#{tag.to_s.gsub('.', '_')}"
+ @topic_name = schema_name = "#{tag.to_s.tr('.$:', '_')}_#{fields.to_s.hash.abs}"
schema_json = {
"type": "record",
"name": schema_name,
"fields": fields
}.to_json
@@ -296,6 +296,7 @@
refresh_client(false)
# Raise exception to retry sendind messages
raise e
end
end
+