lib/kafka/producer.rb in ruby-kafka-0.3.4 vs lib/kafka/producer.rb in ruby-kafka-0.3.5

- old
+ new

@@ -192,15 +192,15 @@ partition_key: partition_key, create_time: create_time, ) if buffer_size >= @max_buffer_size - raise BufferOverflow, "Max buffer size (#{@max_buffer_size} messages) exceeded" + buffer_overflow topic, "Max buffer size (#{@max_buffer_size} messages) exceeded" end if buffer_bytesize + message.bytesize >= @max_buffer_bytesize - raise BufferOverflow, "Max buffer bytesize (#{@max_buffer_bytesize} bytes) exceeded" + buffer_overflow topic, "Max buffer bytesize (#{@max_buffer_bytesize} bytes) exceeded" end @target_topics.add(topic) @pending_message_queue.write(message) @@ -359,8 +359,16 @@ @logger.error "Failed to assign partitions to #{failed_messages.count} messages" @cluster.mark_as_stale! end @pending_message_queue.replace(failed_messages) + end + + def buffer_overflow(topic, message) + @instrumenter.instrument("buffer_overflow.producer", { + topic: topic, + }) + + raise BufferOverflow, message end end end