lib/kafka/async_producer.rb in ruby-kafka-0.7.2 vs lib/kafka/async_producer.rb in ruby-kafka-0.7.3
- old
+ new
@@ -57,10 +57,11 @@
#
# # Remember to shut down the producer when you're done with it.
# producer.shutdown
#
class AsyncProducer
+ THREAD_MUTEX = Mutex.new
# Initializes a new AsyncProducer.
#
# @param sync_producer [Kafka::Producer] the synchronous producer that should
# be used in the background.
@@ -144,14 +145,18 @@
end
private
def ensure_threads_running!
- @worker_thread = nil unless @worker_thread && @worker_thread.alive?
- @worker_thread ||= Thread.new { @worker.run }
+ THREAD_MUTEX.synchronize do
+ @worker_thread = nil unless @worker_thread && @worker_thread.alive?
+ @worker_thread ||= Thread.new { @worker.run }
+ end
- @timer_thread = nil unless @timer_thread && @timer_thread.alive?
- @timer_thread ||= Thread.new { @timer.run }
+ THREAD_MUTEX.synchronize do
+ @timer_thread = nil unless @timer_thread && @timer_thread.alive?
+ @timer_thread ||= Thread.new { @timer.run }
+ end
end
def buffer_overflow(topic, message)
@instrumenter.instrument("buffer_overflow.async_producer", {
topic: topic,