lib/kafka/producer.rb in ruby-kafka-0.4.0.beta1 vs lib/kafka/producer.rb in ruby-kafka-0.4.0
- old
+ new
@@ -332,15 +332,23 @@
end
end
def assign_partitions!
failed_messages = []
+ topics_with_failures = Set.new
@pending_message_queue.each do |message|
partition = message.partition
begin
+ # If a message for a topic fails to receive a partition all subsequent
+ # messages for the topic should be retried to preserve ordering
+ if topics_with_failures.include?(message.topic)
+ failed_messages << message
+ next
+ end
+
if partition.nil?
partition_count = @cluster.partitions_for(message.topic).count
partition = Partitioner.partition_for_key(partition_count, message)
end
@@ -355,9 +363,10 @@
@instrumenter.instrument("topic_error.producer", {
topic: message.topic,
exception: [e.class.to_s, e.message],
})
+ topics_with_failures << message.topic
failed_messages << message
end
end
if failed_messages.any?