lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.18.0 vs lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.18.1
- old
+ new
@@ -36,13 +36,19 @@
end
end
end
# for out_kafka2
+# Majority (if not all) of this code is lifted from https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/producer.rb
+# with the main difference where we have removed any checks regarding max_buffer_bytesize and max_buffer_size
+# The reason for doing this is to provide a better UX for our users where they only need to set those bounds in
+# the Buffer section using `chunk_limit_size` and `chunk_limit_records`.
+#
+# We should reconsider this in the future in case the `ruby-kafka` library drastically changes its internal.
module Kafka
class Client
- def topic_producer(topic, compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60)
+ def custom_producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60)
cluster = initialize_cluster
compressor = Compressor.new(
codec_name: compression_codec,
threshold: compression_threshold,
instrumenter: @instrumenter,
@@ -55,12 +61,11 @@
transactional: transactional,
transactional_id: transactional_id,
transactional_timeout: transactional_timeout,
)
- TopicProducer.new(topic,
- cluster: cluster,
+ CustomProducer.new(cluster: cluster,
transaction_manager: transaction_manager,
logger: @logger,
instrumenter: @instrumenter,
compressor: compressor,
ack_timeout: ack_timeout,
@@ -72,12 +77,12 @@
partitioner: @partitioner,
)
end
end
- class TopicProducer
- def initialize(topic, cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:)
+ class CustomProducer
+ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:)
@cluster = cluster
@transaction_manager = transaction_manager
@logger = logger
@instrumenter = instrumenter
@required_acks = required_acks == :all ? -1 : required_acks
@@ -86,27 +91,23 @@
@retry_backoff = retry_backoff
@max_buffer_size = max_buffer_size
@max_buffer_bytesize = max_buffer_bytesize
@compressor = compressor
@partitioner = partitioner
-
- @topic = topic
- @cluster.add_target_topics(Set.new([topic]))
-
# A buffer organized by topic/partition.
@buffer = MessageBuffer.new
# Messages added by `#produce` but not yet assigned a partition.
@pending_message_queue = PendingMessageQueue.new
end
- def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now)
+ def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now, topic: nil)
message = PendingMessage.new(
value: value,
key: key,
headers: headers,
- topic: @topic,
+ topic: topic,
partition: partition,
partition_key: partition_key,
create_time: create_time
)
@@ -243,15 +244,16 @@
end
end
def assign_partitions!
failed_messages = []
- partition_count = @cluster.partitions_for(@topic).count
@pending_message_queue.each do |message|
partition = message.partition
begin
+ partition_count = @cluster.partitions_for(message.topic).count
+
if partition.nil?
partition = @partitioner.call(partition_count, message)
end
@buffer.write(