lib/kafka/async_producer.rb in ruby-kafka-0.1.6 vs lib/kafka/async_producer.rb in ruby-kafka-0.1.7

- old
+ new

@@ -16,18 +16,31 @@ # will automatically deliver its messages every _n_ seconds. # # By default, automatic delivery is disabled and you'll have to call # {#deliver_messages} manually. # + # ## Buffer Overflow and Backpressure + # # The calling thread communicates with the background thread doing the actual # work using a thread safe queue. While the background thread is busy delivering # messages, new messages will be buffered in the queue. In order to avoid # the queue growing uncontrollably in cases where the background thread gets # stuck or can't follow the pace of the calling thread, there's a maximum # number of messages that is allowed to be buffered. You can configure this # value by setting `max_queue_size`. # + # If you produce messages faster than the background producer thread can + # deliver them to Kafka you will eventually fill the producer's buffer. Once + # this happens, the background thread will stop popping messages off the + # queue until it can successfully deliver the buffered messages. The queue + # will therefore grow in size, potentially hitting the `max_queue_size` limit. + # Once this happens, calls to {#produce} will raise a {BufferOverflow} error. + # + # Depending on your use case you may want to slow down the rate of messages + # being produced or perhaps halt your application completely until the + # producer can deliver the buffered messages and clear the message queue. + # # ## Example # # producer = kafka.async_producer( # # Keep at most 1.000 messages in the buffer before delivering: # delivery_threshold: 1000, @@ -89,10 +102,11 @@ # @param (see Kafka::Producer#produce) # @raise [BufferOverflow] if the message queue is full. # @return [nil] def produce(*args) raise BufferOverflow if @queue.size >= @max_queue_size + @queue << [:produce, args] nil end @@ -144,11 +158,11 @@ loop do operation, payload = @queue.pop case operation when :produce - @producer.produce(*payload) + produce(*payload) deliver_messages if threshold_reached? when :deliver_messages deliver_messages when :shutdown # Deliver any pending messages first. @@ -163,9 +177,16 @@ ensure @producer.shutdown end private + + def produce(*args) + @producer.produce(*args) + rescue BufferOverflow + deliver_messages + retry + end def deliver_messages @producer.deliver_messages rescue DeliveryFailed # Delivery failed.