lib/kafka/producer.rb in ruby-kafka-0.1.4 vs lib/kafka/producer.rb in ruby-kafka-0.1.5

- old
+ new

@@ -21,14 +21,14 @@ # `logger` options to `#get_producer`. See {#initialize} for the list of other options # you can pass in. # # ## Buffering # - # The producer buffers pending messages until {#send_messages} is called. Note that there is + # The producer buffers pending messages until {#deliver_messages} is called. Note that there is # a maximum buffer size (default is 1,000 messages) and writing messages after the # buffer has reached this size will result in a BufferOverflow exception. Make sure - # to periodically call {#send_messages} or set `max_buffer_size` to an appropriate value. + # to periodically call {#deliver_messages} or set `max_buffer_size` to an appropriate value. # # Buffering messages and sending them in batches greatly improves performance, so # try to avoid sending messages after every write. The tradeoff between throughput and # message delays depends on your use case. # @@ -44,10 +44,21 @@ # # After this, we check if the buffer is empty. If it is, we're all done. If it's # not, we do another round of requests, this time with just the remaining messages. # We do this for as long as `max_retries` permits. # + # ## Instrumentation + # + # After {#deliver_messages} completes, the notification + # `deliver_messages.producer.kafka` will be emitted. + # + # * `message_count` – the total number of messages that the producer tried to + # deliver. Note that not all messages may get delivered. + # * `delivered_message_count` – the number of messages that were successfully + # delivered. + # * `attempts` – the number of attempts made to deliver the messages. + # # ## Example # # This is an example of an application which reads lines from stdin and writes them # to Kafka: # @@ -71,15 +82,15 @@ # begin # $stdin.each_with_index do |line, index| # producer.produce(line, topic: topic) # # # Send messages for every 10 lines. - # producer.send_messages if index % 10 == 0 + # producer.deliver_messages if index % 10 == 0 # end # ensure # # Make sure to send any remaining messages. - # producer.send_messages + # producer.deliver_messages # # producer.shutdown # end # class Producer @@ -121,11 +132,11 @@ # Messages added by `#produce` but not yet assigned a partition. @pending_messages = [] end # Produces a message to the specified topic. Note that messages are buffered in - # the producer until {#send_messages} is called. + # the producer until {#deliver_messages} is called. # # ## Partitioning # # There are several options for specifying the partition that the message should # be written to. @@ -174,11 +185,45 @@ # the writes. The `ack_timeout` setting places an upper bound on the amount of # time the call will block before failing. # # @raise [FailedToSendMessages] if not all messages could be successfully sent. # @return [nil] - def send_messages + def deliver_messages + # There's no need to do anything if the buffer is empty. + return if buffer_size == 0 + + Instrumentation.instrument("deliver_messages.producer.kafka") do |notification| + message_count = buffer_size + + notification[:message_count] = message_count + notification[:attempts] = 0 + + begin + deliver_messages_with_retries(notification) + ensure + notification[:delivered_message_count] = message_count - buffer_size + end + end + end + + # Returns the number of messages currently held in the buffer. + # + # @return [Integer] buffer size. + def buffer_size + @pending_messages.size + @buffer.size + end + + # Closes all connections to the brokers. + # + # @return [nil] + def shutdown + @cluster.disconnect + end + + private + + def deliver_messages_with_retries(notification) attempt = 0 # Make sure we get metadata for this topic. target_topics = @pending_messages.map(&:topic).uniq @cluster.add_target_topics(target_topics) @@ -192,10 +237,12 @@ ) loop do attempt += 1 + notification[:attempts] = attempt + @cluster.refresh_metadata_if_necessary! assign_partitions! operation.execute @@ -222,25 +269,9 @@ partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ") raise FailedToSendMessages, "Failed to send messages to #{partitions}" end end - - # Returns the number of messages currently held in the buffer. - # - # @return [Integer] buffer size. - def buffer_size - @pending_messages.size + @buffer.size - end - - # Closes all connections to the brokers. - # - # @return [nil] - def shutdown - @cluster.disconnect - end - - private def assign_partitions! until @pending_messages.empty? # We want to keep the message in the first-stage buffer in case there's an error. message = @pending_messages.first