lib/kafka/async_producer.rb in ruby-kafka-1.2.0 vs lib/kafka/async_producer.rb in ruby-kafka-1.3.0
- old
+ new
@@ -57,12 +57,10 @@
#
# # Remember to shut down the producer when you're done with it.
# producer.shutdown
#
class AsyncProducer
- THREAD_MUTEX = Mutex.new
-
# Initializes a new AsyncProducer.
#
# @param sync_producer [Kafka::Producer] the synchronous producer that should
# be used in the background.
# @param max_queue_size [Integer] the maximum number of messages allowed in
@@ -92,10 +90,12 @@
logger: logger
)
# The timer will no-op if the delivery interval is zero.
@timer = Timer.new(queue: @queue, interval: delivery_interval)
+
+ @thread_mutex = Mutex.new
end
# Produces a message to the specified topic.
#
# @see Kafka::Producer#produce
@@ -129,10 +129,12 @@
# immediately and the actual work will be done in the background.
#
# @see Kafka::Producer#deliver_messages
# @return [nil]
def deliver_messages
+ ensure_threads_running!
+
@queue << [:deliver_messages, nil]
nil
end
@@ -140,31 +142,38 @@
# method will block until the buffered messages have been delivered.
#
# @see Kafka::Producer#shutdown
# @return [nil]
def shutdown
+ ensure_threads_running!
+
@timer_thread && @timer_thread.exit
@queue << [:shutdown, nil]
@worker_thread && @worker_thread.join
nil
end
private
def ensure_threads_running!
- THREAD_MUTEX.synchronize do
- @worker_thread = nil unless @worker_thread && @worker_thread.alive?
- @worker_thread ||= Thread.new { @worker.run }
- end
+ return if worker_thread_alive? && timer_thread_alive?
- THREAD_MUTEX.synchronize do
- @timer_thread = nil unless @timer_thread && @timer_thread.alive?
- @timer_thread ||= Thread.new { @timer.run }
+ @thread_mutex.synchronize do
+ @worker_thread = Thread.new { @worker.run } unless worker_thread_alive?
+ @timer_thread = Thread.new { @timer.run } unless timer_thread_alive?
end
end
+ def worker_thread_alive?
+ !!@worker_thread && @worker_thread.alive?
+ end
+
+ def timer_thread_alive?
+ !!@timer_thread && @timer_thread.alive?
+ end
+
def buffer_overflow(topic, message)
@instrumenter.instrument("buffer_overflow.async_producer", {
topic: topic,
})
@@ -206,10 +215,10 @@
loop do
operation, payload = @queue.pop
case operation
when :produce
- produce(*payload)
+ produce(payload[0], **payload[1])
deliver_messages if threshold_reached?
when :deliver_messages
deliver_messages
when :shutdown
begin