lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.14.0 vs lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.14.1
- old
+ new
@@ -67,16 +67,17 @@
required_acks: required_acks,
max_retries: max_retries,
retry_backoff: retry_backoff,
max_buffer_size: max_buffer_size,
max_buffer_bytesize: max_buffer_bytesize,
+ partitioner: @partitioner,
)
end
end
class TopicProducer
- def initialize(topic, cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
+ def initialize(topic, cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:)
@cluster = cluster
@transaction_manager = transaction_manager
@logger = logger
@instrumenter = instrumenter
@required_acks = required_acks == :all ? -1 : required_acks
@@ -84,10 +85,11 @@
@max_retries = max_retries
@retry_backoff = retry_backoff
@max_buffer_size = max_buffer_size
@max_buffer_bytesize = max_buffer_bytesize
@compressor = compressor
+ @partitioner = partitioner
@topic = topic
@cluster.add_target_topics(Set.new([topic]))
# A buffer organized by topic/partition.
@@ -248,10 +250,10 @@
@pending_message_queue.each do |message|
partition = message.partition
begin
if partition.nil?
- partition = Partitioner.call(partition_count, message)
+ partition = @partitioner.call(partition_count, message)
end
@buffer.write(
value: message.value,
key: message.key,