lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.19.2 vs lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.19.3
- old
+ new
@@ -91,10 +91,14 @@
@retry_backoff = retry_backoff
@max_buffer_size = max_buffer_size
@max_buffer_bytesize = max_buffer_bytesize
@compressor = compressor
@partitioner = partitioner
+
+ # The set of topics that are produced to.
+ @target_topics = Set.new
+
# A buffer organized by topic/partition.
@buffer = MessageBuffer.new
# Messages added by `#produce` but not yet assigned a partition.
@pending_message_queue = PendingMessageQueue.new
@@ -114,11 +118,12 @@
# If the producer is in transactional mode, all the message production
# must be used when the producer is currently in transaction
if @transaction_manager.transactional? && !@transaction_manager.in_transaction?
raise 'You must trigger begin_transaction before producing messages'
end
-
+
+ @target_topics.add(topic)
@pending_message_queue.write(message)
nil
end
@@ -185,10 +190,10 @@
end
def deliver_messages_with_retries
attempt = 0
- #@cluster.add_target_topics(@target_topics)
+ @cluster.add_target_topics(@target_topics)
operation = ProduceOperation.new(
cluster: @cluster,
transaction_manager: @transaction_manager,
buffer: @buffer,