lib/kafka/producer.rb in ruby-kafka-0.1.3 vs lib/kafka/producer.rb in ruby-kafka-0.1.4
- old
+ new
@@ -1,9 +1,9 @@
require "kafka/partitioner"
require "kafka/message_buffer"
-require "kafka/protocol/message"
-require "kafka/transmission"
+require "kafka/produce_operation"
+require "kafka/pending_message"
module Kafka
# Allows sending messages to a Kafka cluster.
#
@@ -15,11 +15,11 @@
#
# # Will instantiate Kafka::Producer
# producer = kafka.get_producer
#
# This is done in order to share a logger as well as a pool of broker connections across
- # different producers. This also means that you don't need to pass the `broker_pool` and
+ # different producers. This also means that you don't need to pass the `cluster` and
# `logger` options to `#get_producer`. See {#initialize} for the list of other options
# you can pass in.
#
# ## Buffering
#
@@ -84,12 +84,11 @@
#
class Producer
# Initializes a new Producer.
#
- # @param broker_pool [BrokerPool] the broker pool representing the cluster.
- # Typically passed in for you.
+ # @param cluster [Cluster] the cluster client. Typically passed in for you.
#
# @param logger [Logger] the logger that should be used. Typically passed
# in for you.
#
# @param ack_timeout [Integer] The number of seconds a broker can wait for
@@ -105,19 +104,24 @@
# @param retry_backoff [Integer] the number of seconds to wait between retries.
#
# @param max_buffer_size [Integer] the number of messages allowed in the buffer
# before new writes will raise BufferOverflow exceptions.
#
- def initialize(broker_pool:, logger:, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000)
- @broker_pool = broker_pool
+ def initialize(cluster:, logger:, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000)
+ @cluster = cluster
@logger = logger
@required_acks = required_acks
@ack_timeout = ack_timeout
@max_retries = max_retries
@retry_backoff = retry_backoff
@max_buffer_size = max_buffer_size
+
+ # A buffer organized by topic/partition.
@buffer = MessageBuffer.new
+
+ # Messages added by `#produce` but not yet assigned a partition.
+ @pending_messages = []
end
# Produces a message to the specified topic. Note that messages are buffered in
# the producer until {#send_messages} is called.
#
@@ -150,22 +154,19 @@
def produce(value, key: nil, topic:, partition: nil, partition_key: nil)
unless buffer_size < @max_buffer_size
raise BufferOverflow, "Max buffer size #{@max_buffer_size} exceeded"
end
- if partition.nil?
- # If no explicit partition key is specified we use the message key instead.
- partition_key ||= key
- partitioner = Partitioner.new(@broker_pool.partitions_for(topic))
- partition = partitioner.partition_for_key(partition_key)
- end
+ @pending_messages << PendingMessage.new(
+ value: value,
+ key: key,
+ topic: topic,
+ partition: partition,
+ partition_key: partition_key,
+ )
- message = Protocol::Message.new(key: key, value: value)
-
- @buffer.write(message, topic: topic, partition: partition)
-
- partition
+ nil
end
# Sends all buffered messages to the Kafka brokers.
#
# Depending on the value of `required_acks` used when initializing the producer,
@@ -176,34 +177,38 @@
# @raise [FailedToSendMessages] if not all messages could be successfully sent.
# @return [nil]
def send_messages
attempt = 0
- transmission = Transmission.new(
- broker_pool: @broker_pool,
+ # Make sure we get metadata for this topic.
+ target_topics = @pending_messages.map(&:topic).uniq
+ @cluster.add_target_topics(target_topics)
+
+ operation = ProduceOperation.new(
+ cluster: @cluster,
buffer: @buffer,
required_acks: @required_acks,
ack_timeout: @ack_timeout,
logger: @logger,
)
loop do
- @logger.info "Sending #{@buffer.size} messages"
-
attempt += 1
- transmission.send_messages
- if @buffer.empty?
- @logger.info "Successfully transmitted all messages"
+ @cluster.refresh_metadata_if_necessary!
+
+ assign_partitions!
+ operation.execute
+
+ if @pending_messages.empty? && @buffer.empty?
break
elsif attempt <= @max_retries
- @logger.warn "Failed to transmit all messages, retry #{attempt} of #{@max_retries}"
- @logger.info "Waiting #{@retry_backoff}s before retrying"
+ @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s"
sleep @retry_backoff
else
- @logger.error "Failed to transmit all messages; keeping remaining messages in buffer"
+ @logger.error "Failed to send all messages; keeping remaining messages in buffer"
break
end
end
if @required_acks == 0
@@ -222,16 +227,45 @@
# Returns the number of messages currently held in the buffer.
#
# @return [Integer] buffer size.
def buffer_size
- @buffer.size
+ @pending_messages.size + @buffer.size
end
# Closes all connections to the brokers.
#
# @return [nil]
def shutdown
- @broker_pool.shutdown
+ @cluster.disconnect
+ end
+
+ private
+
+ def assign_partitions!
+ until @pending_messages.empty?
+ # We want to keep the message in the first-stage buffer in case there's an error.
+ message = @pending_messages.first
+
+ partition = message.partition
+
+ if partition.nil?
+ partition_count = @cluster.partitions_for(message.topic).count
+ partition = Partitioner.partition_for_key(partition_count, message)
+ end
+
+ @buffer.write(
+ value: message.value,
+ key: message.key,
+ topic: message.topic,
+ partition: partition,
+ )
+
+ # Now it's safe to remove the message from the first-stage buffer.
+ @pending_messages.shift
+ end
+ rescue Kafka::Error => e
+ @logger.error "Failed to assign pending message to a partition: #{e}"
+ @cluster.mark_as_stale!
end
end
end