lib/kafka/async_producer.rb in ruby-kafka-0.3.12 vs lib/kafka/async_producer.rb in ruby-kafka-0.3.13.beta1
- old
+ new
@@ -67,18 +67,19 @@
# @param delivery_threshold [Integer] if greater than zero, the number of
# buffered messages that will automatically trigger a delivery.
# @param delivery_interval [Integer] if greater than zero, the number of
# seconds between automatic message deliveries.
#
- def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, instrumenter:)
+ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, instrumenter:, logger:)
raise ArgumentError unless max_queue_size > 0
raise ArgumentError unless delivery_threshold >= 0
raise ArgumentError unless delivery_interval >= 0
@queue = Queue.new
@max_queue_size = max_queue_size
@instrumenter = instrumenter
+ @logger = logger
@worker = Worker.new(
queue: @queue,
producer: sync_producer,
delivery_threshold: delivery_threshold,
@@ -100,10 +101,16 @@
buffer_overflow(topic) if @queue.size >= @max_queue_size
args = [value, **options.merge(topic: topic)]
@queue << [:produce, args]
+ @instrumenter.instrument("enqueue_message.async_producer", {
+ topic: topic,
+ queue_size: @queue.size,
+ max_queue_size: @max_queue_size,
+ })
+
nil
end
# Asynchronously delivers the buffered messages. This method will return
# immediately and the actual work will be done in the background.
@@ -120,10 +127,11 @@
# method will block until the buffered messages have been delivered.
#
# @see Kafka::Producer#shutdown
# @return [nil]
def shutdown
+ @timer_thread && @timer_thread.exit
@queue << [:shutdown, nil]
@worker_thread && @worker_thread.join
nil
end
@@ -146,9 +154,11 @@
def buffer_overflow(topic)
@instrumenter.instrument("buffer_overflow.producer", {
topic: topic,
})
+
+ @logger.error "Buffer overflow: failed to enqueue message for #{topic}"
raise BufferOverflow
end
class Timer