lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.23 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.24

- old
+ new

@@ -198,14 +198,15 @@ elsif @output_data_type == 'avro' require "avro_turf" require 'avro_turf/messaging' require "avro/builder" Proc.new do |tag, time, record| - record = record.select{|key, value| !key.nil? && !key.empty?}.map{|k, v| [k.tr('[]-', '_').delete('$'), v.to_s.force_encoding("UTF-8")]}.to_h + record = record.select{|key, value| !key.nil? && !key.empty?}.map{|k, v| [k.tr('[]-', '_').delete('$'), (v.is_a?(String) ? v.to_s.force_encoding("UTF-8") : v)]}.to_h timestamp = Time.new - record['enchilada_timestamp'] = timestamp.strftime('%s%3N') + record['enchilada_timestamp'] = timestamp.strftime('%s%3N').to_i record['enchilada_time_with_format'] = timestamp.strftime("%Y-%m-%dT%H:%M:%S.%LZ") - fields = record.keys.map{|key| {'name' => key, 'type' => 'string'}} + + fields = record.map{|key, value| {'name' => key, 'type' => (value.is_a?(Fixnum) ? 'int' : (value.is_a?(Float) ? 'float' : 'string'))}} @topic_name = schema_name = "#{tag.to_s.tr('.$:', '_')}_#{Digest::MD5.new.hexdigest(fields.to_s)[0..5]}" schema_json = { "type": "record", "name": schema_name, "fields": fields