lib/kafka/async_producer.rb in ruby-kafka-0.7.6.beta1 vs lib/kafka/async_producer.rb in ruby-kafka-0.7.6.beta2
- old
+ new
@@ -70,11 +70,11 @@
# @param delivery_threshold [Integer] if greater than zero, the number of
# buffered messages that will automatically trigger a delivery.
# @param delivery_interval [Integer] if greater than zero, the number of
# seconds between automatic message deliveries.
#
- def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, instrumenter:, logger:)
+ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
raise ArgumentError unless max_queue_size > 0
raise ArgumentError unless delivery_threshold >= 0
raise ArgumentError unless delivery_interval >= 0
@queue = Queue.new
@@ -84,12 +84,14 @@
@worker = Worker.new(
queue: @queue,
producer: sync_producer,
delivery_threshold: delivery_threshold,
+ max_retries: max_retries,
+ retry_backoff: retry_backoff,
instrumenter: instrumenter,
- logger: logger,
+ logger: logger
)
# The timer will no-op if the delivery interval is zero.
@timer = Timer.new(queue: @queue, interval: delivery_interval)
end
@@ -182,14 +184,16 @@
end
end
end
class Worker
- def initialize(queue:, producer:, delivery_threshold:, instrumenter:, logger:)
+ def initialize(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
@queue = queue
@producer = producer
@delivery_threshold = delivery_threshold
+ @max_retries = max_retries
+ @retry_backoff = retry_backoff
@instrumenter = instrumenter
@logger = TaggedLogger.new(logger)
end
def run
@@ -238,13 +242,25 @@
end
private
def produce(*args)
- @producer.produce(*args)
- rescue BufferOverflow
- deliver_messages
- retry
+ retries = 0
+ begin
+ @producer.produce(*args)
+ rescue BufferOverflow => e
+ deliver_messages
+ if @max_retries == -1
+ retry
+ elsif retries < @max_retries
+ retries += 1
+ sleep @retry_backoff**retries
+ retry
+ else
+ @logger.error("Failed to asynchronously produce messages due to BufferOverflow")
+ @instrumenter.instrument("error.async_producer", { error: e })
+ end
+ end
end
def deliver_messages
@producer.deliver_messages
rescue DeliveryFailed, ConnectionError => e