lib/kafka/async_producer.rb in ruby-kafka-0.1.6 vs lib/kafka/async_producer.rb in ruby-kafka-0.1.7
- old
+ new
@@ -16,18 +16,31 @@
# will automatically deliver its messages every _n_ seconds.
#
# By default, automatic delivery is disabled and you'll have to call
# {#deliver_messages} manually.
#
+ # ## Buffer Overflow and Backpressure
+ #
# The calling thread communicates with the background thread doing the actual
# work using a thread safe queue. While the background thread is busy delivering
# messages, new messages will be buffered in the queue. In order to avoid
# the queue growing uncontrollably in cases where the background thread gets
# stuck or can't follow the pace of the calling thread, there's a maximum
# number of messages that is allowed to be buffered. You can configure this
# value by setting `max_queue_size`.
#
+ # If you produce messages faster than the background producer thread can
+ # deliver them to Kafka you will eventually fill the producer's buffer. Once
+ # this happens, the background thread will stop popping messages off the
+ # queue until it can successfully deliver the buffered messages. The queue
+ # will therefore grow in size, potentially hitting the `max_queue_size` limit.
+ # Once this happens, calls to {#produce} will raise a {BufferOverflow} error.
+ #
+ # Depending on your use case you may want to slow down the rate of messages
+ # being produced or perhaps halt your application completely until the
+ # producer can deliver the buffered messages and clear the message queue.
+ #
# ## Example
#
# producer = kafka.async_producer(
# # Keep at most 1.000 messages in the buffer before delivering:
# delivery_threshold: 1000,
@@ -89,10 +102,11 @@
# @param (see Kafka::Producer#produce)
# @raise [BufferOverflow] if the message queue is full.
# @return [nil]
def produce(*args)
raise BufferOverflow if @queue.size >= @max_queue_size
+
@queue << [:produce, args]
nil
end
@@ -144,11 +158,11 @@
loop do
operation, payload = @queue.pop
case operation
when :produce
- @producer.produce(*payload)
+ produce(*payload)
deliver_messages if threshold_reached?
when :deliver_messages
deliver_messages
when :shutdown
# Deliver any pending messages first.
@@ -163,9 +177,16 @@
ensure
@producer.shutdown
end
private
+
+ def produce(*args)
+ @producer.produce(*args)
+ rescue BufferOverflow
+ deliver_messages
+ retry
+ end
def deliver_messages
@producer.deliver_messages
rescue DeliveryFailed
# Delivery failed.