lib/kafka/producer.rb in ruby-kafka-0.1.6 vs lib/kafka/producer.rb in ruby-kafka-0.1.7

- old
+ new

@@ -1,8 +1,10 @@ +require "set" require "kafka/partitioner" require "kafka/message_buffer" require "kafka/produce_operation" +require "kafka/pending_message_queue" require "kafka/pending_message" require "kafka/compression" module Kafka @@ -45,14 +47,46 @@ # # After this, we check if the buffer is empty. If it is, we're all done. If it's # not, we do another round of requests, this time with just the remaining messages. # We do this for as long as `max_retries` permits. # + # ## Compression + # + # Depending on what kind of data you produce, enabling compression may yield improved + # bandwidth and space usage. Compression in Kafka is done on entire messages sets + # rather than on individual messages. This improves the compression rate and generally + # means that compressions works better the larger your buffers get, since the message + # sets will be larger by the time they're compressed. + # + # Since many workloads have variations in throughput and distribution across partitions, + # it's possible to configure a threshold for when to enable compression by setting + # `compression_threshold`. Only if the defined number of messages are buffered for a + # partition will the messages be compressed. + # + # Compression is enabled by passing the `compression_codec` parameter with the + # name of one of the algorithms allowed by Kafka: + # + # * `:snappy` for [Snappy](http://google.github.io/snappy/) compression. + # * `:gzip` for [gzip](https://en.wikipedia.org/wiki/Gzip) compression. + # + # By default, all message sets will be compressed if you specify a compression + # codec. To increase the compression threshold, set `compression_threshold` to + # an integer value higher than one. + # # ## Instrumentation # + # Whenever {#produce} is called, the notification `produce_message.producer.kafka` + # will be emitted with the following payload: + # + # * `value` – the message value. + # * `key` – the message key. + # * `topic` – the topic that was produced to. + # * `buffer_size` – the buffer size after adding the message. + # * `max_buffer_size` – the maximum allowed buffer size for the producer. + # # After {#deliver_messages} completes, the notification - # `deliver_messages.producer.kafka` will be emitted. + # `deliver_messages.producer.kafka` will be emitted with the following payload: # # * `message_count` – the total number of messages that the producer tried to # deliver. Note that not all messages may get delivered. # * `delivered_message_count` – the number of messages that were successfully # delivered. @@ -114,27 +148,44 @@ # original attempt. # # @param retry_backoff [Integer] the number of seconds to wait between retries. # # @param max_buffer_size [Integer] the number of messages allowed in the buffer - # before new writes will raise BufferOverflow exceptions. + # before new writes will raise {BufferOverflow} exceptions. # - def initialize(cluster:, logger:, compression_codec: nil, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000) + # @param max_buffer_bytesize [Integer] the maximum size of the buffer in bytes. + # attempting to produce messages when the buffer reaches this size will + # result in {BufferOverflow} being raised. + # + # @param compression_codec [Symbol, nil] the name of the compression codec to + # use, or nil if no compression should be performed. Valid codecs: `:snappy` + # and `:gzip`. + # + # @param compression_threshold [Integer] the number of messages that needs to + # be in a message set before it should be compressed. Note that message sets + # are per-partition rather than per-topic or per-producer. + # + def initialize(cluster:, logger:, compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000) @cluster = cluster @logger = logger @required_acks = required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size + @max_buffer_bytesize = max_buffer_bytesize @compression_codec = Compression.find_codec(compression_codec) + @compression_threshold = compression_threshold + # The set of topics that are produced to. + @target_topics = Set.new + # A buffer organized by topic/partition. @buffer = MessageBuffer.new # Messages added by `#produce` but not yet assigned a partition. - @pending_messages = [] + @pending_message_queue = PendingMessageQueue.new end # Produces a message to the specified topic. Note that messages are buffered in # the producer until {#deliver_messages} is called. # @@ -163,22 +214,37 @@ # @param partition_key [String] the key that should be used to assign a partition. # # @raise [BufferOverflow] if the maximum buffer size has been reached. # @return [nil] def produce(value, key: nil, topic:, partition: nil, partition_key: nil) - unless buffer_size < @max_buffer_size - raise BufferOverflow, "Max buffer size #{@max_buffer_size} exceeded" - end - - @pending_messages << PendingMessage.new( + message = PendingMessage.new( value: value, key: key, topic: topic, partition: partition, partition_key: partition_key, ) + if buffer_size >= @max_buffer_size + raise BufferOverflow, "Max buffer size (#{@max_buffer_size} messages) exceeded" + end + + if buffer_bytesize + message.bytesize >= @max_buffer_bytesize + raise BufferOverflow, "Max buffer bytesize (#{@max_buffer_bytesize} bytes) exceeded" + end + + @target_topics.add(topic) + @pending_message_queue.write(message) + + Instrumentation.instrument("produce_message.producer.kafka", { + value: value, + key: key, + topic: topic, + buffer_size: buffer_size, + max_buffer_size: @max_buffer_size, + }) + nil end # Sends all buffered messages to the Kafka brokers. # @@ -209,13 +275,17 @@ # Returns the number of messages currently held in the buffer. # # @return [Integer] buffer size. def buffer_size - @pending_messages.size + @buffer.size + @pending_message_queue.size + @buffer.size end + def buffer_bytesize + @pending_message_queue.bytesize + @buffer.bytesize + end + # Closes all connections to the brokers. # # @return [nil] def shutdown @cluster.disconnect @@ -224,20 +294,19 @@ private def deliver_messages_with_retries(notification) attempt = 0 - # Make sure we get metadata for this topic. - target_topics = @pending_messages.map(&:topic).uniq - @cluster.add_target_topics(target_topics) + @cluster.add_target_topics(@target_topics) operation = ProduceOperation.new( cluster: @cluster, buffer: @buffer, required_acks: @required_acks, ack_timeout: @ack_timeout, compression_codec: @compression_codec, + compression_threshold: @compression_threshold, logger: @logger, ) loop do attempt += 1 @@ -247,11 +316,11 @@ @cluster.refresh_metadata_if_necessary! assign_partitions! operation.execute - if @pending_messages.empty? && @buffer.empty? + if buffer_size.zero? break elsif attempt <= @max_retries @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s" sleep @retry_backoff @@ -274,14 +343,11 @@ raise DeliveryFailed, "Failed to send messages to #{partitions}" end end def assign_partitions! - until @pending_messages.empty? - # We want to keep the message in the first-stage buffer in case there's an error. - message = @pending_messages.first - + @pending_message_queue.dequeue_each do |message| partition = message.partition if partition.nil? partition_count = @cluster.partitions_for(message.topic).count partition = Partitioner.partition_for_key(partition_count, message) @@ -291,12 +357,9 @@ value: message.value, key: message.key, topic: message.topic, partition: partition, ) - - # Now it's safe to remove the message from the first-stage buffer. - @pending_messages.shift end rescue Kafka::Error => e @logger.error "Failed to assign pending message to a partition: #{e}" @cluster.mark_as_stale! end