lib/kafka/producer.rb in ruby-kafka-0.4.0.beta1 vs lib/kafka/producer.rb in ruby-kafka-0.4.0

- old
+ new

@@ -332,15 +332,23 @@ end end def assign_partitions! failed_messages = [] + topics_with_failures = Set.new @pending_message_queue.each do |message| partition = message.partition begin + # If a message for a topic fails to receive a partition all subsequent + # messages for the topic should be retried to preserve ordering + if topics_with_failures.include?(message.topic) + failed_messages << message + next + end + if partition.nil? partition_count = @cluster.partitions_for(message.topic).count partition = Partitioner.partition_for_key(partition_count, message) end @@ -355,9 +363,10 @@ @instrumenter.instrument("topic_error.producer", { topic: message.topic, exception: [e.class.to_s, e.message], }) + topics_with_failures << message.topic failed_messages << message end end if failed_messages.any?