lib/kafka/async_producer.rb in ruby-kafka-1.2.0 vs lib/kafka/async_producer.rb in ruby-kafka-1.3.0

- old
+ new

@@ -57,12 +57,10 @@ # # # Remember to shut down the producer when you're done with it. # producer.shutdown # class AsyncProducer - THREAD_MUTEX = Mutex.new - # Initializes a new AsyncProducer. # # @param sync_producer [Kafka::Producer] the synchronous producer that should # be used in the background. # @param max_queue_size [Integer] the maximum number of messages allowed in @@ -92,10 +90,12 @@ logger: logger ) # The timer will no-op if the delivery interval is zero. @timer = Timer.new(queue: @queue, interval: delivery_interval) + + @thread_mutex = Mutex.new end # Produces a message to the specified topic. # # @see Kafka::Producer#produce @@ -129,10 +129,12 @@ # immediately and the actual work will be done in the background. # # @see Kafka::Producer#deliver_messages # @return [nil] def deliver_messages + ensure_threads_running! + @queue << [:deliver_messages, nil] nil end @@ -140,31 +142,38 @@ # method will block until the buffered messages have been delivered. # # @see Kafka::Producer#shutdown # @return [nil] def shutdown + ensure_threads_running! + @timer_thread && @timer_thread.exit @queue << [:shutdown, nil] @worker_thread && @worker_thread.join nil end private def ensure_threads_running! - THREAD_MUTEX.synchronize do - @worker_thread = nil unless @worker_thread && @worker_thread.alive? - @worker_thread ||= Thread.new { @worker.run } - end + return if worker_thread_alive? && timer_thread_alive? - THREAD_MUTEX.synchronize do - @timer_thread = nil unless @timer_thread && @timer_thread.alive? - @timer_thread ||= Thread.new { @timer.run } + @thread_mutex.synchronize do + @worker_thread = Thread.new { @worker.run } unless worker_thread_alive? + @timer_thread = Thread.new { @timer.run } unless timer_thread_alive? end end + def worker_thread_alive? + !!@worker_thread && @worker_thread.alive? + end + + def timer_thread_alive? + !!@timer_thread && @timer_thread.alive? + end + def buffer_overflow(topic, message) @instrumenter.instrument("buffer_overflow.async_producer", { topic: topic, }) @@ -206,10 +215,10 @@ loop do operation, payload = @queue.pop case operation when :produce - produce(*payload) + produce(payload[0], **payload[1]) deliver_messages if threshold_reached? when :deliver_messages deliver_messages when :shutdown begin