lib/kafka/producer.rb in ruby-kafka-0.3.4 vs lib/kafka/producer.rb in ruby-kafka-0.3.5
- old
+ new
@@ -192,15 +192,15 @@
partition_key: partition_key,
create_time: create_time,
)
if buffer_size >= @max_buffer_size
- raise BufferOverflow, "Max buffer size (#{@max_buffer_size} messages) exceeded"
+ buffer_overflow topic, "Max buffer size (#{@max_buffer_size} messages) exceeded"
end
if buffer_bytesize + message.bytesize >= @max_buffer_bytesize
- raise BufferOverflow, "Max buffer bytesize (#{@max_buffer_bytesize} bytes) exceeded"
+ buffer_overflow topic, "Max buffer bytesize (#{@max_buffer_bytesize} bytes) exceeded"
end
@target_topics.add(topic)
@pending_message_queue.write(message)
@@ -359,8 +359,16 @@
@logger.error "Failed to assign partitions to #{failed_messages.count} messages"
@cluster.mark_as_stale!
end
@pending_message_queue.replace(failed_messages)
+ end
+
+ def buffer_overflow(topic, message)
+ @instrumenter.instrument("buffer_overflow.producer", {
+ topic: topic,
+ })
+
+ raise BufferOverflow, message
end
end
end