lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.19.2 vs lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.19.3

- old
+ new

@@ -91,10 +91,14 @@ @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor @partitioner = partitioner + + # The set of topics that are produced to. + @target_topics = Set.new + # A buffer organized by topic/partition. @buffer = MessageBuffer.new # Messages added by `#produce` but not yet assigned a partition. @pending_message_queue = PendingMessageQueue.new @@ -114,11 +118,12 @@ # If the producer is in transactional mode, all the message production # must be used when the producer is currently in transaction if @transaction_manager.transactional? && !@transaction_manager.in_transaction? raise 'You must trigger begin_transaction before producing messages' end - + + @target_topics.add(topic) @pending_message_queue.write(message) nil end @@ -185,10 +190,10 @@ end def deliver_messages_with_retries attempt = 0 - #@cluster.add_target_topics(@target_topics) + @cluster.add_target_topics(@target_topics) operation = ProduceOperation.new( cluster: @cluster, transaction_manager: @transaction_manager, buffer: @buffer,