lib/fluent/plugin/out_kafka.rb in fluent-plugin-kafka-enchanced-0.5.1 vs lib/fluent/plugin/out_kafka.rb in fluent-plugin-kafka-enchanced-0.5.2
- old
+ new
@@ -174,10 +174,10 @@
else
record['time'] = time
end
end
record['tag'] = tag if @output_include_tag
- topic = (@exclude_topic_key ? record.delete('topic') : record['topic']) || @default_topic || tag
+ topic = (@exclude_topic_key ? record.delete('topic') : record['topic']) || @default_topic || tag.to_s.gsub('.', '_')
partition_key = (@exclude_partition_key ? record.delete('partition_key') : record['partition_key']) || @default_partition_key
partition = (@exclude_partition ? record.delete('partition'.freeze) : record['partition'.freeze]) || @default_partition
message_key = (@exclude_message_key ? record.delete('message_key') : record['message_key']) || @default_message_key
value = @formatter_proc.call(tag, time, record)