lib/kafka/async_producer.rb in ruby-kafka-0.7.2 vs lib/kafka/async_producer.rb in ruby-kafka-0.7.3

- old
+ new

@@ -57,10 +57,11 @@ # # # Remember to shut down the producer when you're done with it. # producer.shutdown # class AsyncProducer + THREAD_MUTEX = Mutex.new # Initializes a new AsyncProducer. # # @param sync_producer [Kafka::Producer] the synchronous producer that should # be used in the background. @@ -144,14 +145,18 @@ end private def ensure_threads_running! - @worker_thread = nil unless @worker_thread && @worker_thread.alive? - @worker_thread ||= Thread.new { @worker.run } + THREAD_MUTEX.synchronize do + @worker_thread = nil unless @worker_thread && @worker_thread.alive? + @worker_thread ||= Thread.new { @worker.run } + end - @timer_thread = nil unless @timer_thread && @timer_thread.alive? - @timer_thread ||= Thread.new { @timer.run } + THREAD_MUTEX.synchronize do + @timer_thread = nil unless @timer_thread && @timer_thread.alive? + @timer_thread ||= Thread.new { @timer.run } + end end def buffer_overflow(topic, message) @instrumenter.instrument("buffer_overflow.async_producer", { topic: topic,