lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.14.0 vs lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.14.1

- old
+ new

@@ -67,16 +67,17 @@ required_acks: required_acks, max_retries: max_retries, retry_backoff: retry_backoff, max_buffer_size: max_buffer_size, max_buffer_bytesize: max_buffer_bytesize, + partitioner: @partitioner, ) end end class TopicProducer - def initialize(topic, cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) + def initialize(topic, cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:) @cluster = cluster @transaction_manager = transaction_manager @logger = logger @instrumenter = instrumenter @required_acks = required_acks == :all ? -1 : required_acks @@ -84,10 +85,11 @@ @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor + @partitioner = partitioner @topic = topic @cluster.add_target_topics(Set.new([topic])) # A buffer organized by topic/partition. @@ -248,10 +250,10 @@ @pending_message_queue.each do |message| partition = message.partition begin if partition.nil? - partition = Partitioner.call(partition_count, message) + partition = @partitioner.call(partition_count, message) end @buffer.write( value: message.value, key: message.key,