lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.14 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.15
- old
+ new
@@ -229,11 +229,10 @@
end
end
def write(chunk)
tag = chunk.key
- def_topic = @default_topic || tag.to_s.gsub('.', '_')
producer = get_producer
records_by_topic = {}
bytes_by_topic = {}
messages = 0
@@ -251,10 +250,11 @@
record['time'.freeze] = time
end
end
record['tag'] = tag if @output_include_tag
- topic = (@exclude_topic_key ? record.delete('topic'.freeze) : record['topic'.freeze]) || def_topic
+ record_buf = @formatter_proc.call(tag, time, record)
+ topic = (@exclude_topic_key ? record.delete('topic'.freeze) : record['topic'.freeze]) || @topic_name
partition_key = (@exclude_partition_key ? record.delete('partition_key'.freeze) : record['partition_key'.freeze]) || @default_partition_key
partition = (@exclude_partition ? record.delete('partition'.freeze) : record['partition'.freeze]) || @default_partition
message_key = (@exclude_message_key ? record.delete('message_key'.freeze) : record['message_key'.freeze]) || @default_message_key
records_by_topic[topic] ||= 0