lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.34 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.35
- old
+ new
@@ -206,12 +206,10 @@
[
k.tr('[]-', '_').delete('$'),
(v.is_a?(Fixnum) || v.is_a?(Float) || v.nil? ? v : v.to_s.force_encoding("UTF-8"))
]
end.to_h
- timestamp = Time.new
- record['enchilada_time_with_format'] = timestamp.strftime("%Y-%m-%dT%H:%M:%S.%LZ")
@topic_name = schema_name = "#{tag.to_s.tr('.$:', '_')}_#{Digest::MD5.new.hexdigest(record.keys.to_s)[0..5]}"
avro = AvroTurf::Messaging.new(registry_url: @schema_registry)
unless (stored_schema = get_schema_from_redis_by_name(schema_name))
@@ -221,10 +219,11 @@
'type' => ['null', (value.is_a?(Fixnum) ? 'int' : (value.is_a?(Float) ? 'float' : 'string'))]
}
end
field_types = fields.map{|field| [field['name'], (field['type'] - ['null']).first]}.to_h
fields << {"name" => "enchilada_timestamp", "type" => "long"}
+ fields << {"name" => "enchilada_time_with_format", "type" => "string"}
schema_json = {
"type": "record",
"name": schema_name,
"fields": fields
}.to_json
@@ -241,9 +240,11 @@
set_schema_to_redis(schema_name, stored_schema)
end
+ timestamp = Time.new
+ record['enchilada_time_with_format'] = timestamp.strftime("%Y-%m-%dT%H:%M:%S.%LZ")
record['enchilada_timestamp'] = timestamp.strftime('%s%3N').to_i
record = record.map do |key, val|
[key, (stored_schema['field_types'][key] != 'string' || val.nil? ? val : val.to_s)]
end.to_h