lib/kafka/async_producer.rb in ruby-kafka-0.3.4 vs lib/kafka/async_producer.rb in ruby-kafka-0.3.5
- old
+ new
@@ -67,46 +67,41 @@
# @param delivery_threshold [Integer] if greater than zero, the number of
# buffered messages that will automatically trigger a delivery.
# @param delivery_interval [Integer] if greater than zero, the number of
# seconds between automatic message deliveries.
#
- def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0)
+ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, instrumenter:)
raise ArgumentError unless max_queue_size > 0
raise ArgumentError unless delivery_threshold >= 0
raise ArgumentError unless delivery_interval >= 0
@queue = Queue.new
@max_queue_size = max_queue_size
+ @instrumenter = instrumenter
- @worker_thread = Thread.new do
- worker = Worker.new(
- queue: @queue,
- producer: sync_producer,
- delivery_threshold: delivery_threshold,
- )
+ @worker = Worker.new(
+ queue: @queue,
+ producer: sync_producer,
+ delivery_threshold: delivery_threshold,
+ )
- worker.run
- end
-
- @worker_thread.abort_on_exception = true
-
- if delivery_interval > 0
- Thread.new do
- Timer.new(queue: @queue, interval: delivery_interval).run
- end
- end
+ # The timer will no-op if the delivery interval is zero.
+ @timer = Timer.new(queue: @queue, interval: delivery_interval)
end
# Produces a message to the specified topic.
#
# @see Kafka::Producer#produce
# @param (see Kafka::Producer#produce)
# @raise [BufferOverflow] if the message queue is full.
# @return [nil]
- def produce(*args)
- raise BufferOverflow if @queue.size >= @max_queue_size
+ def produce(value, topic:, **options)
+ ensure_threads_running!
+ buffer_overflow(topic) if @queue.size >= @max_queue_size
+
+ args = [value, **options.merge(topic: topic)]
@queue << [:produce, args]
nil
end
@@ -126,21 +121,48 @@
#
# @see Kafka::Producer#shutdown
# @return [nil]
def shutdown
@queue << [:shutdown, nil]
- @worker_thread.join
+ @worker_thread && @worker_thread.join
nil
end
+ private
+
+ def ensure_threads_running!
+ @worker_thread = nil unless @worker_thread && @worker_thread.alive?
+ @worker_thread ||= start_thread { @worker.run }
+
+ @timer_thread = nil unless @timer_thread && @timer_thread.alive?
+ @timer_thread ||= start_thread { @timer.run }
+ end
+
+ def start_thread(&block)
+ thread = Thread.new(&block)
+ thread.abort_on_exception = true
+ thread
+ end
+
+ def buffer_overflow(topic)
+ @instrumenter.instrument("buffer_overflow.producer", {
+ topic: topic,
+ })
+
+ raise BufferOverflow
+ end
+
class Timer
def initialize(interval:, queue:)
@queue = queue
@interval = interval
end
def run
+ # Permanently sleep if the timer interval is zero.
+ Thread.stop if @interval.zero?
+
loop do
sleep(@interval)
@queue << [:deliver_messages, nil]
end
end