lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.30 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.31
- old
+ new
@@ -1,9 +1,10 @@
require 'thread'
require 'fluent/output'
require 'fluent/plugin/kafka_plugin_util'
require 'digest'
+require 'redis'
class Fluent::KafkaOutputBuffered < Fluent::BufferedOutput
Fluent::Plugin.register_output('kafka_buffered', self)
config_param :brokers, :string, :default => 'localhost:9092',
@@ -197,10 +198,11 @@
Proc.new { |tag, time, record| record.to_msgpack }
elsif @output_data_type == 'avro'
require "avro_turf"
require 'avro_turf/messaging'
require "avro/builder"
+ init_redis
Proc.new do |tag, time, record|
record = record.select{|key, value| !key.nil? && !key.empty?}.map{|k, v| [k.tr('[]-', '_').delete('$'), ((!v.is_a?(Fixnum) && !v.is_a?(Float)) ? v.to_s.force_encoding("UTF-8") : v)]}.to_h
timestamp = Time.new
record['enchilada_time_with_format'] = timestamp.strftime("%Y-%m-%dT%H:%M:%S.%LZ")
@@ -299,9 +301,28 @@
# For safety, refresh client and its producers
shutdown_producers
refresh_client(false)
# Raise exception to retry sendind messages
raise e
+ end
+
+ def init_redis
+ $redis = Redis.new
+ end
+
+ def set_schema_to_redis schema_name, schema
+ $redis.set(schema_name, schema.to_json)
+ end
+
+ def get_schema_from_redis_by_name schema_name
+ if stored_schema = $redis.get(schema_name)
+ parsed_schema = JSON.parse($redis.get(schema_name))
+ {
+ 'schema_id' => parsed_schema['schema_id'],
+ 'schema' => Avro::Schema.parse(parsed_schema['schema']),
+ 'field_types' => parsed_schema['field_types']
+ }
+ end
end
end