lib/kafka/async_producer.rb in ruby-kafka-0.7.6.beta1 vs lib/kafka/async_producer.rb in ruby-kafka-0.7.6.beta2

- old
+ new

@@ -70,11 +70,11 @@ # @param delivery_threshold [Integer] if greater than zero, the number of # buffered messages that will automatically trigger a delivery. # @param delivery_interval [Integer] if greater than zero, the number of # seconds between automatic message deliveries. # - def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, instrumenter:, logger:) + def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) raise ArgumentError unless max_queue_size > 0 raise ArgumentError unless delivery_threshold >= 0 raise ArgumentError unless delivery_interval >= 0 @queue = Queue.new @@ -84,12 +84,14 @@ @worker = Worker.new( queue: @queue, producer: sync_producer, delivery_threshold: delivery_threshold, + max_retries: max_retries, + retry_backoff: retry_backoff, instrumenter: instrumenter, - logger: logger, + logger: logger ) # The timer will no-op if the delivery interval is zero. @timer = Timer.new(queue: @queue, interval: delivery_interval) end @@ -182,14 +184,16 @@ end end end class Worker - def initialize(queue:, producer:, delivery_threshold:, instrumenter:, logger:) + def initialize(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) @queue = queue @producer = producer @delivery_threshold = delivery_threshold + @max_retries = max_retries + @retry_backoff = retry_backoff @instrumenter = instrumenter @logger = TaggedLogger.new(logger) end def run @@ -238,13 +242,25 @@ end private def produce(*args) - @producer.produce(*args) - rescue BufferOverflow - deliver_messages - retry + retries = 0 + begin + @producer.produce(*args) + rescue BufferOverflow => e + deliver_messages + if @max_retries == -1 + retry + elsif retries < @max_retries + retries += 1 + sleep @retry_backoff**retries + retry + else + @logger.error("Failed to asynchronously produce messages due to BufferOverflow") + @instrumenter.instrument("error.async_producer", { error: e }) + end + end end def deliver_messages @producer.deliver_messages rescue DeliveryFailed, ConnectionError => e