lib/kafka/async_producer.rb in ruby-kafka-0.3.4 vs lib/kafka/async_producer.rb in ruby-kafka-0.3.5

- old
+ new

@@ -67,46 +67,41 @@ # @param delivery_threshold [Integer] if greater than zero, the number of # buffered messages that will automatically trigger a delivery. # @param delivery_interval [Integer] if greater than zero, the number of # seconds between automatic message deliveries. # - def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0) + def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, instrumenter:) raise ArgumentError unless max_queue_size > 0 raise ArgumentError unless delivery_threshold >= 0 raise ArgumentError unless delivery_interval >= 0 @queue = Queue.new @max_queue_size = max_queue_size + @instrumenter = instrumenter - @worker_thread = Thread.new do - worker = Worker.new( - queue: @queue, - producer: sync_producer, - delivery_threshold: delivery_threshold, - ) + @worker = Worker.new( + queue: @queue, + producer: sync_producer, + delivery_threshold: delivery_threshold, + ) - worker.run - end - - @worker_thread.abort_on_exception = true - - if delivery_interval > 0 - Thread.new do - Timer.new(queue: @queue, interval: delivery_interval).run - end - end + # The timer will no-op if the delivery interval is zero. + @timer = Timer.new(queue: @queue, interval: delivery_interval) end # Produces a message to the specified topic. # # @see Kafka::Producer#produce # @param (see Kafka::Producer#produce) # @raise [BufferOverflow] if the message queue is full. # @return [nil] - def produce(*args) - raise BufferOverflow if @queue.size >= @max_queue_size + def produce(value, topic:, **options) + ensure_threads_running! + buffer_overflow(topic) if @queue.size >= @max_queue_size + + args = [value, **options.merge(topic: topic)] @queue << [:produce, args] nil end @@ -126,21 +121,48 @@ # # @see Kafka::Producer#shutdown # @return [nil] def shutdown @queue << [:shutdown, nil] - @worker_thread.join + @worker_thread && @worker_thread.join nil end + private + + def ensure_threads_running! + @worker_thread = nil unless @worker_thread && @worker_thread.alive? + @worker_thread ||= start_thread { @worker.run } + + @timer_thread = nil unless @timer_thread && @timer_thread.alive? + @timer_thread ||= start_thread { @timer.run } + end + + def start_thread(&block) + thread = Thread.new(&block) + thread.abort_on_exception = true + thread + end + + def buffer_overflow(topic) + @instrumenter.instrument("buffer_overflow.producer", { + topic: topic, + }) + + raise BufferOverflow + end + class Timer def initialize(interval:, queue:) @queue = queue @interval = interval end def run + # Permanently sleep if the timer interval is zero. + Thread.stop if @interval.zero? + loop do sleep(@interval) @queue << [:deliver_messages, nil] end end