lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.18.0 vs lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.18.1

- old
+ new

@@ -36,13 +36,19 @@ end end end # for out_kafka2 +# Majority (if not all) of this code is lifted from https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/producer.rb +# with the main difference where we have removed any checks regarding max_buffer_bytesize and max_buffer_size +# The reason for doing this is to provide a better UX for our users where they only need to set those bounds in +# the Buffer section using `chunk_limit_size` and `chunk_limit_records`. +# +# We should reconsider this in the future in case the `ruby-kafka` library drastically changes its internal. module Kafka class Client - def topic_producer(topic, compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60) + def custom_producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60) cluster = initialize_cluster compressor = Compressor.new( codec_name: compression_codec, threshold: compression_threshold, instrumenter: @instrumenter, @@ -55,12 +61,11 @@ transactional: transactional, transactional_id: transactional_id, transactional_timeout: transactional_timeout, ) - TopicProducer.new(topic, - cluster: cluster, + CustomProducer.new(cluster: cluster, transaction_manager: transaction_manager, logger: @logger, instrumenter: @instrumenter, compressor: compressor, ack_timeout: ack_timeout, @@ -72,12 +77,12 @@ partitioner: @partitioner, ) end end - class TopicProducer - def initialize(topic, cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:) + class CustomProducer + def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:) @cluster = cluster @transaction_manager = transaction_manager @logger = logger @instrumenter = instrumenter @required_acks = required_acks == :all ? -1 : required_acks @@ -86,27 +91,23 @@ @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor @partitioner = partitioner - - @topic = topic - @cluster.add_target_topics(Set.new([topic])) - # A buffer organized by topic/partition. @buffer = MessageBuffer.new # Messages added by `#produce` but not yet assigned a partition. @pending_message_queue = PendingMessageQueue.new end - def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now) + def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now, topic: nil) message = PendingMessage.new( value: value, key: key, headers: headers, - topic: @topic, + topic: topic, partition: partition, partition_key: partition_key, create_time: create_time ) @@ -243,15 +244,16 @@ end end def assign_partitions! failed_messages = [] - partition_count = @cluster.partitions_for(@topic).count @pending_message_queue.each do |message| partition = message.partition begin + partition_count = @cluster.partitions_for(message.topic).count + if partition.nil? partition = @partitioner.call(partition_count, message) end @buffer.write(