lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.30 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.31

- old
+ new

@@ -1,9 +1,10 @@ require 'thread' require 'fluent/output' require 'fluent/plugin/kafka_plugin_util' require 'digest' +require 'redis' class Fluent::KafkaOutputBuffered < Fluent::BufferedOutput Fluent::Plugin.register_output('kafka_buffered', self) config_param :brokers, :string, :default => 'localhost:9092', @@ -197,10 +198,11 @@ Proc.new { |tag, time, record| record.to_msgpack } elsif @output_data_type == 'avro' require "avro_turf" require 'avro_turf/messaging' require "avro/builder" + init_redis Proc.new do |tag, time, record| record = record.select{|key, value| !key.nil? && !key.empty?}.map{|k, v| [k.tr('[]-', '_').delete('$'), ((!v.is_a?(Fixnum) && !v.is_a?(Float)) ? v.to_s.force_encoding("UTF-8") : v)]}.to_h timestamp = Time.new record['enchilada_time_with_format'] = timestamp.strftime("%Y-%m-%dT%H:%M:%S.%LZ") @@ -299,9 +301,28 @@ # For safety, refresh client and its producers shutdown_producers refresh_client(false) # Raise exception to retry sendind messages raise e + end + + def init_redis + $redis = Redis.new + end + + def set_schema_to_redis schema_name, schema + $redis.set(schema_name, schema.to_json) + end + + def get_schema_from_redis_by_name schema_name + if stored_schema = $redis.get(schema_name) + parsed_schema = JSON.parse($redis.get(schema_name)) + { + 'schema_id' => parsed_schema['schema_id'], + 'schema' => Avro::Schema.parse(parsed_schema['schema']), + 'field_types' => parsed_schema['field_types'] + } + end end end