lib/kafka/async_producer.rb in ruby-kafka-0.5.1 vs lib/kafka/async_producer.rb in ruby-kafka-0.5.2.beta1
- old
+ new
@@ -98,11 +98,14 @@
# @raise [BufferOverflow] if the message queue is full.
# @return [nil]
def produce(value, topic:, **options)
ensure_threads_running!
- buffer_overflow(topic) if @queue.size >= @max_queue_size
+ if @queue.size >= @max_queue_size
+ buffer_overflow topic,
+ "Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached"
+ end
args = [value, **options.merge(topic: topic)]
@queue << [:produce, args]
@instrumenter.instrument("enqueue_message.async_producer", {
@@ -146,17 +149,15 @@
@timer_thread = nil unless @timer_thread && @timer_thread.alive?
@timer_thread ||= Thread.new { @timer.run }
end
- def buffer_overflow(topic)
+ def buffer_overflow(topic, message)
@instrumenter.instrument("buffer_overflow.async_producer", {
topic: topic,
})
- @logger.error "Cannot produce message to #{topic}, max queue size (#{@max_queue_size}) reached"
-
- raise BufferOverflow
+ raise BufferOverflow, message
end
class Timer
def initialize(interval:, queue:)
@queue = queue