lib/kafka/producer.rb in ruby-kafka-0.1.4 vs lib/kafka/producer.rb in ruby-kafka-0.1.5
- old
+ new
@@ -21,14 +21,14 @@
# `logger` options to `#get_producer`. See {#initialize} for the list of other options
# you can pass in.
#
# ## Buffering
#
- # The producer buffers pending messages until {#send_messages} is called. Note that there is
+ # The producer buffers pending messages until {#deliver_messages} is called. Note that there is
# a maximum buffer size (default is 1,000 messages) and writing messages after the
# buffer has reached this size will result in a BufferOverflow exception. Make sure
- # to periodically call {#send_messages} or set `max_buffer_size` to an appropriate value.
+ # to periodically call {#deliver_messages} or set `max_buffer_size` to an appropriate value.
#
# Buffering messages and sending them in batches greatly improves performance, so
# try to avoid sending messages after every write. The tradeoff between throughput and
# message delays depends on your use case.
#
@@ -44,10 +44,21 @@
#
# After this, we check if the buffer is empty. If it is, we're all done. If it's
# not, we do another round of requests, this time with just the remaining messages.
# We do this for as long as `max_retries` permits.
#
+ # ## Instrumentation
+ #
+ # After {#deliver_messages} completes, the notification
+ # `deliver_messages.producer.kafka` will be emitted.
+ #
+ # * `message_count` – the total number of messages that the producer tried to
+ # deliver. Note that not all messages may get delivered.
+ # * `delivered_message_count` – the number of messages that were successfully
+ # delivered.
+ # * `attempts` – the number of attempts made to deliver the messages.
+ #
# ## Example
#
# This is an example of an application which reads lines from stdin and writes them
# to Kafka:
#
@@ -71,15 +82,15 @@
# begin
# $stdin.each_with_index do |line, index|
# producer.produce(line, topic: topic)
#
# # Send messages for every 10 lines.
- # producer.send_messages if index % 10 == 0
+ # producer.deliver_messages if index % 10 == 0
# end
# ensure
# # Make sure to send any remaining messages.
- # producer.send_messages
+ # producer.deliver_messages
#
# producer.shutdown
# end
#
class Producer
@@ -121,11 +132,11 @@
# Messages added by `#produce` but not yet assigned a partition.
@pending_messages = []
end
# Produces a message to the specified topic. Note that messages are buffered in
- # the producer until {#send_messages} is called.
+ # the producer until {#deliver_messages} is called.
#
# ## Partitioning
#
# There are several options for specifying the partition that the message should
# be written to.
@@ -174,11 +185,45 @@
# the writes. The `ack_timeout` setting places an upper bound on the amount of
# time the call will block before failing.
#
# @raise [FailedToSendMessages] if not all messages could be successfully sent.
# @return [nil]
- def send_messages
+ def deliver_messages
+ # There's no need to do anything if the buffer is empty.
+ return if buffer_size == 0
+
+ Instrumentation.instrument("deliver_messages.producer.kafka") do |notification|
+ message_count = buffer_size
+
+ notification[:message_count] = message_count
+ notification[:attempts] = 0
+
+ begin
+ deliver_messages_with_retries(notification)
+ ensure
+ notification[:delivered_message_count] = message_count - buffer_size
+ end
+ end
+ end
+
+ # Returns the number of messages currently held in the buffer.
+ #
+ # @return [Integer] buffer size.
+ def buffer_size
+ @pending_messages.size + @buffer.size
+ end
+
+ # Closes all connections to the brokers.
+ #
+ # @return [nil]
+ def shutdown
+ @cluster.disconnect
+ end
+
+ private
+
+ def deliver_messages_with_retries(notification)
attempt = 0
# Make sure we get metadata for this topic.
target_topics = @pending_messages.map(&:topic).uniq
@cluster.add_target_topics(target_topics)
@@ -192,10 +237,12 @@
)
loop do
attempt += 1
+ notification[:attempts] = attempt
+
@cluster.refresh_metadata_if_necessary!
assign_partitions!
operation.execute
@@ -222,25 +269,9 @@
partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")
raise FailedToSendMessages, "Failed to send messages to #{partitions}"
end
end
-
- # Returns the number of messages currently held in the buffer.
- #
- # @return [Integer] buffer size.
- def buffer_size
- @pending_messages.size + @buffer.size
- end
-
- # Closes all connections to the brokers.
- #
- # @return [nil]
- def shutdown
- @cluster.disconnect
- end
-
- private
def assign_partitions!
until @pending_messages.empty?
# We want to keep the message in the first-stage buffer in case there's an error.
message = @pending_messages.first