lib/fluent/plugin/out_kafka.rb in fluent-plugin-kafka-enchanced-0.5.30 vs lib/fluent/plugin/out_kafka.rb in fluent-plugin-kafka-enchanced-0.5.31
- old
+ new
@@ -1,9 +1,10 @@
require 'fluent/output'
require 'fluent/plugin/kafka_plugin_util'
require 'pry'
require 'digest'
+require 'redis'
class Fluent::KafkaOutput < Fluent::Output
Fluent::Plugin.register_output('kafka', self)
config_param :brokers, :string, :default => 'localhost:9092',
@@ -144,27 +145,58 @@
Proc.new { |tag, time, record| Yajl::Encoder.encode(record) }
elsif @output_data_type == 'avro'
require "avro_turf"
require 'avro_turf/messaging'
require "avro/builder"
+ init_redis
Proc.new do |tag, time, record|
- record = record.select{|key, value| !key.nil? && !key.empty?}.map{|k, v| [k.tr('[]-', '_').delete('$'), ((!v.is_a?(Fixnum) && !v.is_a?(Float)) ? v.to_s.force_encoding("UTF-8") : v)]}.to_h
+ record = record.select{|key, value| !key.nil? && !key.empty?}.map do |k, v|
+ [
+ k.tr('[]-', '_').delete('$'),
+ (v.is_a?(Fixnum) || v.is_a?(Float) || v.nil? ? v : v.to_s.force_encoding("UTF-8"))
+ ]
+ end.to_h
timestamp = Time.new
record['enchilada_time_with_format'] = timestamp.strftime("%Y-%m-%dT%H:%M:%S.%LZ")
+ @topic_name = schema_name = "#{tag.to_s.tr('.$:', '_')}_#{Digest::MD5.new.hexdigest(record.keys.to_s)[0..5]}"
- fields = record.map{|key, value| {'name' => key, 'type' => (value.is_a?(Fixnum) ? 'int' : (value.is_a?(Float) ? 'float' : 'string'))}}
- record['enchilada_timestamp'] = timestamp.strftime('%s%3N').to_i
- fields << {"name" => "enchilada_timestamp", "type" => "long"}
- @topic_name = schema_name = "#{tag.to_s.tr('.$:', '_')}_#{Digest::MD5.new.hexdigest(fields.to_s)[0..5]}"
- schema_json = {
- "type": "record",
- "name": schema_name,
- "fields": fields
- }.to_json
- schema = Avro::Schema.parse(schema_json)
avro = AvroTurf::Messaging.new(registry_url: @schema_registry)
- avro.encode(record, schema: schema, subject: "#{schema_name}-value")
+
+ unless (stored_schema = get_schema_from_redis_by_name(schema_name))
+ fields = record.map do |key, value|
+ {
+ 'name' => key,
+ 'type' => ['null', (value.is_a?(Fixnum) ? 'int' : (value.is_a?(Float) ? 'float' : 'string'))]
+ }
+ end
+ field_types = fields.map{|field| [field['name'], (field['type'] - ['null']).first]}.to_h
+ fields << {"name" => "enchilada_timestamp", "type" => "long"}
+ schema_json = {
+ "type": "record",
+ "name": schema_name,
+ "fields": fields
+ }.to_json
+ registry = avro.instance_variable_get('@registry')
+ schema = Avro::Schema.parse(schema_json)
+ schema_id = registry.register("#{schema_name}-value", schema)
+
+ stored_schema = {
+ 'schema' => schema,
+ 'schema_id' => schema_id,
+ 'field_types' => field_types
+ }
+
+ set_schema_to_redis(schema_name, stored_schema)
+
+ end
+
+ record['enchilada_timestamp'] = timestamp.strftime('%s%3N').to_i
+ record = record.map do |key, val|
+ [key, (stored_schema['field_types'][key] != 'string' || val.nil? ? val : val.to_s)]
+ end.to_h
+
+ avro.encode(record, stored_schema['schema_id'], schema: stored_schema['schema'])
end
elsif @output_data_type == 'ltsv'
require 'ltsv'
Proc.new { |tag, time, record| LTSV.dump(record) }
elsif @output_data_type == 'msgpack'
@@ -222,10 +254,30 @@
refresh_client
raise e
end
end
+ def init_redis
+ $redis = Redis.new
+ end
+
+ def set_schema_to_redis schema_name, schema
+ $redis.set(schema_name, schema.to_json)
+ end
+
+ def get_schema_from_redis_by_name schema_name
+ if stored_schema = $redis.get(schema_name)
+ parsed_schema = JSON.parse($redis.get(schema_name))
+ {
+ 'schema_id' => parsed_schema['schema_id'],
+ 'schema' => Avro::Schema.parse(parsed_schema['schema']),
+ 'field_types' => parsed_schema['field_types']
+ }
+ end
+ end
+
end
+