lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.32 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.33

- old
+ new

@@ -233,11 +233,12 @@ schema_id = registry.register("#{schema_name}-value", schema) stored_schema = { 'schema_json' => schema_json, 'schema_id' => schema_id, - 'field_types' => field_types + 'field_types' => field_types, + 'schema' => schema } set_schema_to_redis(schema_name, stored_schema) end @@ -245,11 +246,11 @@ record['enchilada_timestamp'] = timestamp.strftime('%s%3N').to_i record = record.map do |key, val| [key, (stored_schema['field_types'][key] != 'string' || val.nil? ? val : val.to_s)] end.to_h - avro.encode(record, stored_schema['schema_id'], schema: Avro::Schema.parse(stored_schema['schema_json'])) + avro.encode(record, stored_schema['schema_id'], schema: stored_schema['schema_json']) end elsif @output_data_type =~ /^attr:(.*)$/ @custom_attributes = $1.split(',').map(&:strip).reject(&:empty?) @custom_attributes.unshift('time') if @output_include_time @custom_attributes.unshift('tag') if @output_include_tag @@ -345,10 +346,10 @@ def get_schema_from_redis_by_name schema_name if stored_schema = $redis.get(schema_name) parsed_schema = JSON.parse($redis.get(schema_name)) { 'schema_id' => parsed_schema['schema_id'], - 'schema_json' => Avro::Schema.parse(parsed_schema['schema_json']), + 'schema' => Avro::Schema.parse(parsed_schema['schema_json']), 'field_types' => parsed_schema['field_types'] } end end end