lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.32 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.33
- old
+ new
@@ -233,11 +233,12 @@
schema_id = registry.register("#{schema_name}-value", schema)
stored_schema = {
'schema_json' => schema_json,
'schema_id' => schema_id,
- 'field_types' => field_types
+ 'field_types' => field_types,
+ 'schema' => schema
}
set_schema_to_redis(schema_name, stored_schema)
end
@@ -245,11 +246,11 @@
record['enchilada_timestamp'] = timestamp.strftime('%s%3N').to_i
record = record.map do |key, val|
[key, (stored_schema['field_types'][key] != 'string' || val.nil? ? val : val.to_s)]
end.to_h
- avro.encode(record, stored_schema['schema_id'], schema: Avro::Schema.parse(stored_schema['schema_json']))
+ avro.encode(record, stored_schema['schema_id'], schema: stored_schema['schema_json'])
end
elsif @output_data_type =~ /^attr:(.*)$/
@custom_attributes = $1.split(',').map(&:strip).reject(&:empty?)
@custom_attributes.unshift('time') if @output_include_time
@custom_attributes.unshift('tag') if @output_include_tag
@@ -345,10 +346,10 @@
def get_schema_from_redis_by_name schema_name
if stored_schema = $redis.get(schema_name)
parsed_schema = JSON.parse($redis.get(schema_name))
{
'schema_id' => parsed_schema['schema_id'],
- 'schema_json' => Avro::Schema.parse(parsed_schema['schema_json']),
+ 'schema' => Avro::Schema.parse(parsed_schema['schema_json']),
'field_types' => parsed_schema['field_types']
}
end
end
end