lib/kafka/producer.rb in ruby-kafka-0.1.6 vs lib/kafka/producer.rb in ruby-kafka-0.1.7
- old
+ new
@@ -1,8 +1,10 @@
+require "set"
require "kafka/partitioner"
require "kafka/message_buffer"
require "kafka/produce_operation"
+require "kafka/pending_message_queue"
require "kafka/pending_message"
require "kafka/compression"
module Kafka
@@ -45,14 +47,46 @@
#
# After this, we check if the buffer is empty. If it is, we're all done. If it's
# not, we do another round of requests, this time with just the remaining messages.
# We do this for as long as `max_retries` permits.
#
+ # ## Compression
+ #
+ # Depending on what kind of data you produce, enabling compression may yield improved
+ # bandwidth and space usage. Compression in Kafka is done on entire messages sets
+ # rather than on individual messages. This improves the compression rate and generally
+ # means that compressions works better the larger your buffers get, since the message
+ # sets will be larger by the time they're compressed.
+ #
+ # Since many workloads have variations in throughput and distribution across partitions,
+ # it's possible to configure a threshold for when to enable compression by setting
+ # `compression_threshold`. Only if the defined number of messages are buffered for a
+ # partition will the messages be compressed.
+ #
+ # Compression is enabled by passing the `compression_codec` parameter with the
+ # name of one of the algorithms allowed by Kafka:
+ #
+ # * `:snappy` for [Snappy](http://google.github.io/snappy/) compression.
+ # * `:gzip` for [gzip](https://en.wikipedia.org/wiki/Gzip) compression.
+ #
+ # By default, all message sets will be compressed if you specify a compression
+ # codec. To increase the compression threshold, set `compression_threshold` to
+ # an integer value higher than one.
+ #
# ## Instrumentation
#
+ # Whenever {#produce} is called, the notification `produce_message.producer.kafka`
+ # will be emitted with the following payload:
+ #
+ # * `value` – the message value.
+ # * `key` – the message key.
+ # * `topic` – the topic that was produced to.
+ # * `buffer_size` – the buffer size after adding the message.
+ # * `max_buffer_size` – the maximum allowed buffer size for the producer.
+ #
# After {#deliver_messages} completes, the notification
- # `deliver_messages.producer.kafka` will be emitted.
+ # `deliver_messages.producer.kafka` will be emitted with the following payload:
#
# * `message_count` – the total number of messages that the producer tried to
# deliver. Note that not all messages may get delivered.
# * `delivered_message_count` – the number of messages that were successfully
# delivered.
@@ -114,27 +148,44 @@
# original attempt.
#
# @param retry_backoff [Integer] the number of seconds to wait between retries.
#
# @param max_buffer_size [Integer] the number of messages allowed in the buffer
- # before new writes will raise BufferOverflow exceptions.
+ # before new writes will raise {BufferOverflow} exceptions.
#
- def initialize(cluster:, logger:, compression_codec: nil, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000)
+ # @param max_buffer_bytesize [Integer] the maximum size of the buffer in bytes.
+ # attempting to produce messages when the buffer reaches this size will
+ # result in {BufferOverflow} being raised.
+ #
+ # @param compression_codec [Symbol, nil] the name of the compression codec to
+ # use, or nil if no compression should be performed. Valid codecs: `:snappy`
+ # and `:gzip`.
+ #
+ # @param compression_threshold [Integer] the number of messages that needs to
+ # be in a message set before it should be compressed. Note that message sets
+ # are per-partition rather than per-topic or per-producer.
+ #
+ def initialize(cluster:, logger:, compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000)
@cluster = cluster
@logger = logger
@required_acks = required_acks
@ack_timeout = ack_timeout
@max_retries = max_retries
@retry_backoff = retry_backoff
@max_buffer_size = max_buffer_size
+ @max_buffer_bytesize = max_buffer_bytesize
@compression_codec = Compression.find_codec(compression_codec)
+ @compression_threshold = compression_threshold
+ # The set of topics that are produced to.
+ @target_topics = Set.new
+
# A buffer organized by topic/partition.
@buffer = MessageBuffer.new
# Messages added by `#produce` but not yet assigned a partition.
- @pending_messages = []
+ @pending_message_queue = PendingMessageQueue.new
end
# Produces a message to the specified topic. Note that messages are buffered in
# the producer until {#deliver_messages} is called.
#
@@ -163,22 +214,37 @@
# @param partition_key [String] the key that should be used to assign a partition.
#
# @raise [BufferOverflow] if the maximum buffer size has been reached.
# @return [nil]
def produce(value, key: nil, topic:, partition: nil, partition_key: nil)
- unless buffer_size < @max_buffer_size
- raise BufferOverflow, "Max buffer size #{@max_buffer_size} exceeded"
- end
-
- @pending_messages << PendingMessage.new(
+ message = PendingMessage.new(
value: value,
key: key,
topic: topic,
partition: partition,
partition_key: partition_key,
)
+ if buffer_size >= @max_buffer_size
+ raise BufferOverflow, "Max buffer size (#{@max_buffer_size} messages) exceeded"
+ end
+
+ if buffer_bytesize + message.bytesize >= @max_buffer_bytesize
+ raise BufferOverflow, "Max buffer bytesize (#{@max_buffer_bytesize} bytes) exceeded"
+ end
+
+ @target_topics.add(topic)
+ @pending_message_queue.write(message)
+
+ Instrumentation.instrument("produce_message.producer.kafka", {
+ value: value,
+ key: key,
+ topic: topic,
+ buffer_size: buffer_size,
+ max_buffer_size: @max_buffer_size,
+ })
+
nil
end
# Sends all buffered messages to the Kafka brokers.
#
@@ -209,13 +275,17 @@
# Returns the number of messages currently held in the buffer.
#
# @return [Integer] buffer size.
def buffer_size
- @pending_messages.size + @buffer.size
+ @pending_message_queue.size + @buffer.size
end
+ def buffer_bytesize
+ @pending_message_queue.bytesize + @buffer.bytesize
+ end
+
# Closes all connections to the brokers.
#
# @return [nil]
def shutdown
@cluster.disconnect
@@ -224,20 +294,19 @@
private
def deliver_messages_with_retries(notification)
attempt = 0
- # Make sure we get metadata for this topic.
- target_topics = @pending_messages.map(&:topic).uniq
- @cluster.add_target_topics(target_topics)
+ @cluster.add_target_topics(@target_topics)
operation = ProduceOperation.new(
cluster: @cluster,
buffer: @buffer,
required_acks: @required_acks,
ack_timeout: @ack_timeout,
compression_codec: @compression_codec,
+ compression_threshold: @compression_threshold,
logger: @logger,
)
loop do
attempt += 1
@@ -247,11 +316,11 @@
@cluster.refresh_metadata_if_necessary!
assign_partitions!
operation.execute
- if @pending_messages.empty? && @buffer.empty?
+ if buffer_size.zero?
break
elsif attempt <= @max_retries
@logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s"
sleep @retry_backoff
@@ -274,14 +343,11 @@
raise DeliveryFailed, "Failed to send messages to #{partitions}"
end
end
def assign_partitions!
- until @pending_messages.empty?
- # We want to keep the message in the first-stage buffer in case there's an error.
- message = @pending_messages.first
-
+ @pending_message_queue.dequeue_each do |message|
partition = message.partition
if partition.nil?
partition_count = @cluster.partitions_for(message.topic).count
partition = Partitioner.partition_for_key(partition_count, message)
@@ -291,12 +357,9 @@
value: message.value,
key: message.key,
topic: message.topic,
partition: partition,
)
-
- # Now it's safe to remove the message from the first-stage buffer.
- @pending_messages.shift
end
rescue Kafka::Error => e
@logger.error "Failed to assign pending message to a partition: #{e}"
@cluster.mark_as_stale!
end