lib/kafka/producer.rb in ruby-kafka-0.3.3 vs lib/kafka/producer.rb in ruby-kafka-0.3.4

- old
+ new

@@ -325,27 +325,42 @@ raise DeliveryFailed, "Failed to send messages to #{partitions}" end end def assign_partitions! - @pending_message_queue.dequeue_each do |message| + failed_messages = [] + + @pending_message_queue.each do |message| partition = message.partition - if partition.nil? - partition_count = @cluster.partitions_for(message.topic).count - partition = Partitioner.partition_for_key(partition_count, message) + begin + if partition.nil? + partition_count = @cluster.partitions_for(message.topic).count + partition = Partitioner.partition_for_key(partition_count, message) + end + + @buffer.write( + value: message.value, + key: message.key, + topic: message.topic, + partition: partition, + create_time: message.create_time, + ) + rescue Kafka::Error => e + @instrumenter.instrument("topic_error.producer", { + topic: message.topic, + exception: [e.class.to_s, e.message], + }) + + failed_messages << message end + end - @buffer.write( - value: message.value, - key: message.key, - topic: message.topic, - partition: partition, - create_time: message.create_time, - ) + if failed_messages.any? + @logger.error "Failed to assign partitions to #{failed_messages.count} messages" + @cluster.mark_as_stale! end - rescue Kafka::Error => e - @logger.error "Failed to assign pending message to a partition: #{e}" - @cluster.mark_as_stale! + + @pending_message_queue.replace(failed_messages) end end end