lib/kafka/async_producer.rb in ruby-kafka-0.5.1 vs lib/kafka/async_producer.rb in ruby-kafka-0.5.2.beta1

- old
+ new

@@ -98,11 +98,14 @@ # @raise [BufferOverflow] if the message queue is full. # @return [nil] def produce(value, topic:, **options) ensure_threads_running! - buffer_overflow(topic) if @queue.size >= @max_queue_size + if @queue.size >= @max_queue_size + buffer_overflow topic, + "Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached" + end args = [value, **options.merge(topic: topic)] @queue << [:produce, args] @instrumenter.instrument("enqueue_message.async_producer", { @@ -146,17 +149,15 @@ @timer_thread = nil unless @timer_thread && @timer_thread.alive? @timer_thread ||= Thread.new { @timer.run } end - def buffer_overflow(topic) + def buffer_overflow(topic, message) @instrumenter.instrument("buffer_overflow.async_producer", { topic: topic, }) - @logger.error "Cannot produce message to #{topic}, max queue size (#{@max_queue_size}) reached" - - raise BufferOverflow + raise BufferOverflow, message end class Timer def initialize(interval:, queue:) @queue = queue