lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.20 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.21
- old
+ new
@@ -1,8 +1,9 @@
require 'thread'
require 'fluent/output'
require 'fluent/plugin/kafka_plugin_util'
+require 'digest'
class Fluent::KafkaOutputBuffered < Fluent::BufferedOutput
Fluent::Plugin.register_output('kafka_buffered', self)
config_param :brokers, :string, :default => 'localhost:9092',
@@ -198,12 +199,14 @@
require "avro_turf"
require 'avro_turf/messaging'
require "avro/builder"
Proc.new do |tag, time, record|
record = record.select{|key, value| !key.nil? && !key.empty?}.map{|k, v| [k.tr('[]-', '_').delete('$'), v.to_s]}.to_h
- record['enchilada_timestamp'] = (Time.new).strftime('%s%3N')
+ timestamp = Time.new
+ record['enchilada_timestamp'] = timestamp.strftime('%s%3N')
+ record['enchilada_time_with_format'] = timestamp.strftime("%Y-%m-%dT%H:%M:%S.%LZ")
fields = record.keys.map{|key| {'name' => key, 'type' => 'string'}}
- @topic_name = schema_name = "#{tag.to_s.tr('.$:', '_')}_#{fields.to_s.hash.abs.to_s[0..5]}"
+ @topic_name = schema_name = "#{tag.to_s.tr('.$:', '_')}_#{Digest::MD5.new.hexdigest(fields.to_s)[0..5]}"
schema_json = {
"type": "record",
"name": schema_name,
"fields": fields
}.to_json