lib/fluent/plugin/out_kafka.rb in fluent-plugin-kafka-enchanced-0.5.30 vs lib/fluent/plugin/out_kafka.rb in fluent-plugin-kafka-enchanced-0.5.31

- old
+ new

@@ -1,9 +1,10 @@ require 'fluent/output' require 'fluent/plugin/kafka_plugin_util' require 'pry' require 'digest' +require 'redis' class Fluent::KafkaOutput < Fluent::Output Fluent::Plugin.register_output('kafka', self) config_param :brokers, :string, :default => 'localhost:9092', @@ -144,27 +145,58 @@ Proc.new { |tag, time, record| Yajl::Encoder.encode(record) } elsif @output_data_type == 'avro' require "avro_turf" require 'avro_turf/messaging' require "avro/builder" + init_redis Proc.new do |tag, time, record| - record = record.select{|key, value| !key.nil? && !key.empty?}.map{|k, v| [k.tr('[]-', '_').delete('$'), ((!v.is_a?(Fixnum) && !v.is_a?(Float)) ? v.to_s.force_encoding("UTF-8") : v)]}.to_h + record = record.select{|key, value| !key.nil? && !key.empty?}.map do |k, v| + [ + k.tr('[]-', '_').delete('$'), + (v.is_a?(Fixnum) || v.is_a?(Float) || v.nil? ? v : v.to_s.force_encoding("UTF-8")) + ] + end.to_h timestamp = Time.new record['enchilada_time_with_format'] = timestamp.strftime("%Y-%m-%dT%H:%M:%S.%LZ") + @topic_name = schema_name = "#{tag.to_s.tr('.$:', '_')}_#{Digest::MD5.new.hexdigest(record.keys.to_s)[0..5]}" - fields = record.map{|key, value| {'name' => key, 'type' => (value.is_a?(Fixnum) ? 'int' : (value.is_a?(Float) ? 'float' : 'string'))}} - record['enchilada_timestamp'] = timestamp.strftime('%s%3N').to_i - fields << {"name" => "enchilada_timestamp", "type" => "long"} - @topic_name = schema_name = "#{tag.to_s.tr('.$:', '_')}_#{Digest::MD5.new.hexdigest(fields.to_s)[0..5]}" - schema_json = { - "type": "record", - "name": schema_name, - "fields": fields - }.to_json - schema = Avro::Schema.parse(schema_json) avro = AvroTurf::Messaging.new(registry_url: @schema_registry) - avro.encode(record, schema: schema, subject: "#{schema_name}-value") + + unless (stored_schema = get_schema_from_redis_by_name(schema_name)) + fields = record.map do |key, value| + { + 'name' => key, + 'type' => ['null', (value.is_a?(Fixnum) ? 'int' : (value.is_a?(Float) ? 'float' : 'string'))] + } + end + field_types = fields.map{|field| [field['name'], (field['type'] - ['null']).first]}.to_h + fields << {"name" => "enchilada_timestamp", "type" => "long"} + schema_json = { + "type": "record", + "name": schema_name, + "fields": fields + }.to_json + registry = avro.instance_variable_get('@registry') + schema = Avro::Schema.parse(schema_json) + schema_id = registry.register("#{schema_name}-value", schema) + + stored_schema = { + 'schema' => schema, + 'schema_id' => schema_id, + 'field_types' => field_types + } + + set_schema_to_redis(schema_name, stored_schema) + + end + + record['enchilada_timestamp'] = timestamp.strftime('%s%3N').to_i + record = record.map do |key, val| + [key, (stored_schema['field_types'][key] != 'string' || val.nil? ? val : val.to_s)] + end.to_h + + avro.encode(record, stored_schema['schema_id'], schema: stored_schema['schema']) end elsif @output_data_type == 'ltsv' require 'ltsv' Proc.new { |tag, time, record| LTSV.dump(record) } elsif @output_data_type == 'msgpack' @@ -222,10 +254,30 @@ refresh_client raise e end end + def init_redis + $redis = Redis.new + end + + def set_schema_to_redis schema_name, schema + $redis.set(schema_name, schema.to_json) + end + + def get_schema_from_redis_by_name schema_name + if stored_schema = $redis.get(schema_name) + parsed_schema = JSON.parse($redis.get(schema_name)) + { + 'schema_id' => parsed_schema['schema_id'], + 'schema' => Avro::Schema.parse(parsed_schema['schema']), + 'field_types' => parsed_schema['field_types'] + } + end + end + end +