lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.4.2 vs lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.5.0
- old
+ new
@@ -1,7 +1,15 @@
+require "set"
+require "kafka/partitioner"
+require "kafka/message_buffer"
+require "kafka/produce_operation"
+require "kafka/pending_message_queue"
+require "kafka/pending_message"
+require "kafka/compressor"
require 'kafka/producer'
+# for out_kafka_buffered
module Kafka
class Producer
def produce2(value, key: nil, topic:, partition: nil, partition_key: nil)
create_time = Time.now
@@ -17,8 +25,201 @@
@target_topics.add(topic)
@pending_message_queue.write(message)
nil
+ end
+ end
+end
+
+# for out_kafka2
+module Kafka
+ class Client
+ def topic_producer(topic, compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000)
+ compressor = Compressor.new(
+ codec_name: compression_codec,
+ threshold: compression_threshold,
+ instrumenter: @instrumenter,
+ )
+
+ TopicProducer.new(topic,
+ cluster: initialize_cluster,
+ logger: @logger,
+ instrumenter: @instrumenter,
+ compressor: compressor,
+ ack_timeout: ack_timeout,
+ required_acks: required_acks,
+ max_retries: max_retries,
+ retry_backoff: retry_backoff,
+ max_buffer_size: max_buffer_size,
+ max_buffer_bytesize: max_buffer_bytesize,
+ )
+ end
+ end
+
+ class TopicProducer
+ def initialize(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
+ @cluster = cluster
+ @logger = logger
+ @instrumenter = instrumenter
+ @required_acks = required_acks == :all ? -1 : required_acks
+ @ack_timeout = ack_timeout
+ @max_retries = max_retries
+ @retry_backoff = retry_backoff
+ @max_buffer_size = max_buffer_size
+ @max_buffer_bytesize = max_buffer_bytesize
+ @compressor = compressor
+
+ @topic = topic
+ @cluster.add_target_topics(Set.new([topic]))
+
+ # A buffer organized by topic/partition.
+ @buffer = MessageBuffer.new
+
+ # Messages added by `#produce` but not yet assigned a partition.
+ @pending_message_queue = PendingMessageQueue.new
+ end
+
+ def produce(value, key, partition, partition_key)
+ create_time = Time.now
+
+ message = PendingMessage.new(
+ value,
+ key,
+ @topic,
+ partition,
+ partition_key,
+ create_time,
+ key.to_s.bytesize + value.to_s.bytesize
+ )
+
+ @pending_message_queue.write(message)
+
+ nil
+ end
+
+ def deliver_messages
+ # There's no need to do anything if the buffer is empty.
+ return if buffer_size == 0
+
+ deliver_messages_with_retries
+ end
+
+ # Returns the number of messages currently held in the buffer.
+ #
+ # @return [Integer] buffer size.
+ def buffer_size
+ @pending_message_queue.size + @buffer.size
+ end
+
+ def buffer_bytesize
+ @pending_message_queue.bytesize + @buffer.bytesize
+ end
+
+ # Deletes all buffered messages.
+ #
+ # @return [nil]
+ def clear_buffer
+ @buffer.clear
+ @pending_message_queue.clear
+ end
+
+ # Closes all connections to the brokers.
+ #
+ # @return [nil]
+ def shutdown
+ @cluster.disconnect
+ end
+
+ private
+
+ def deliver_messages_with_retries
+ attempt = 0
+
+ #@cluster.add_target_topics(@target_topics)
+
+ operation = ProduceOperation.new(
+ cluster: @cluster,
+ buffer: @buffer,
+ required_acks: @required_acks,
+ ack_timeout: @ack_timeout,
+ compressor: @compressor,
+ logger: @logger,
+ instrumenter: @instrumenter,
+ )
+
+ loop do
+ attempt += 1
+
+ @cluster.refresh_metadata_if_necessary!
+
+ assign_partitions!
+ operation.execute
+
+ if @required_acks.zero?
+ # No response is returned by the brokers, so we can't know which messages
+ # have been successfully written. Our only option is to assume that they all
+ # have.
+ @buffer.clear
+ end
+
+ if buffer_size.zero?
+ break
+ elsif attempt <= @max_retries
+ @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s"
+
+ sleep @retry_backoff
+ else
+ @logger.error "Failed to send all messages; keeping remaining messages in buffer"
+ break
+ end
+ end
+
+ unless @pending_message_queue.empty?
+ # Mark the cluster as stale in order to force a cluster metadata refresh.
+ @cluster.mark_as_stale!
+ raise DeliveryFailed, "Failed to assign partitions to #{@pending_message_queue.size} messages"
+ end
+
+ unless @buffer.empty?
+ partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")
+
+ raise DeliveryFailed, "Failed to send messages to #{partitions}"
+ end
+ end
+
+ def assign_partitions!
+ failed_messages = []
+ partition_count = @cluster.partitions_for(@topic).count
+
+ @pending_message_queue.each do |message|
+ partition = message.partition
+
+ begin
+ if partition.nil?
+ partition = Partitioner.partition_for_key(partition_count, message)
+ end
+
+ @buffer.write(
+ value: message.value,
+ key: message.key,
+ topic: message.topic,
+ partition: partition,
+ create_time: message.create_time,
+ )
+ rescue Kafka::Error => e
+ failed_messages << message
+ end
+ end
+
+ if failed_messages.any?
+ failed_messages.group_by(&:topic).each do |topic, messages|
+ @logger.error "Failed to assign partitions to #{messages.count} messages in #{topic}"
+ end
+
+ @cluster.mark_as_stale!
+ end
+
+ @pending_message_queue.replace(failed_messages)
end
end
end