lib/kafka/async_producer.rb in ruby-kafka-0.3.12 vs lib/kafka/async_producer.rb in ruby-kafka-0.3.13.beta1

- old
+ new

@@ -67,18 +67,19 @@ # @param delivery_threshold [Integer] if greater than zero, the number of # buffered messages that will automatically trigger a delivery. # @param delivery_interval [Integer] if greater than zero, the number of # seconds between automatic message deliveries. # - def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, instrumenter:) + def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, instrumenter:, logger:) raise ArgumentError unless max_queue_size > 0 raise ArgumentError unless delivery_threshold >= 0 raise ArgumentError unless delivery_interval >= 0 @queue = Queue.new @max_queue_size = max_queue_size @instrumenter = instrumenter + @logger = logger @worker = Worker.new( queue: @queue, producer: sync_producer, delivery_threshold: delivery_threshold, @@ -100,10 +101,16 @@ buffer_overflow(topic) if @queue.size >= @max_queue_size args = [value, **options.merge(topic: topic)] @queue << [:produce, args] + @instrumenter.instrument("enqueue_message.async_producer", { + topic: topic, + queue_size: @queue.size, + max_queue_size: @max_queue_size, + }) + nil end # Asynchronously delivers the buffered messages. This method will return # immediately and the actual work will be done in the background. @@ -120,10 +127,11 @@ # method will block until the buffered messages have been delivered. # # @see Kafka::Producer#shutdown # @return [nil] def shutdown + @timer_thread && @timer_thread.exit @queue << [:shutdown, nil] @worker_thread && @worker_thread.join nil end @@ -146,9 +154,11 @@ def buffer_overflow(topic) @instrumenter.instrument("buffer_overflow.producer", { topic: topic, }) + + @logger.error "Buffer overflow: failed to enqueue message for #{topic}" raise BufferOverflow end class Timer