lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.14 vs lib/fluent/plugin/out_kafka_buffered.rb in fluent-plugin-kafka-enchanced-0.5.15

- old
+ new

@@ -229,11 +229,10 @@ end end def write(chunk) tag = chunk.key - def_topic = @default_topic || tag.to_s.gsub('.', '_') producer = get_producer records_by_topic = {} bytes_by_topic = {} messages = 0 @@ -251,10 +250,11 @@ record['time'.freeze] = time end end record['tag'] = tag if @output_include_tag - topic = (@exclude_topic_key ? record.delete('topic'.freeze) : record['topic'.freeze]) || def_topic + record_buf = @formatter_proc.call(tag, time, record) + topic = (@exclude_topic_key ? record.delete('topic'.freeze) : record['topic'.freeze]) || @topic_name partition_key = (@exclude_partition_key ? record.delete('partition_key'.freeze) : record['partition_key'.freeze]) || @default_partition_key partition = (@exclude_partition ? record.delete('partition'.freeze) : record['partition'.freeze]) || @default_partition message_key = (@exclude_message_key ? record.delete('message_key'.freeze) : record['message_key'.freeze]) || @default_message_key records_by_topic[topic] ||= 0