lib/kafka/producer.rb in ruby-kafka-0.3.3 vs lib/kafka/producer.rb in ruby-kafka-0.3.4
- old
+ new
@@ -325,27 +325,42 @@
raise DeliveryFailed, "Failed to send messages to #{partitions}"
end
end
def assign_partitions!
- @pending_message_queue.dequeue_each do |message|
+ failed_messages = []
+
+ @pending_message_queue.each do |message|
partition = message.partition
- if partition.nil?
- partition_count = @cluster.partitions_for(message.topic).count
- partition = Partitioner.partition_for_key(partition_count, message)
+ begin
+ if partition.nil?
+ partition_count = @cluster.partitions_for(message.topic).count
+ partition = Partitioner.partition_for_key(partition_count, message)
+ end
+
+ @buffer.write(
+ value: message.value,
+ key: message.key,
+ topic: message.topic,
+ partition: partition,
+ create_time: message.create_time,
+ )
+ rescue Kafka::Error => e
+ @instrumenter.instrument("topic_error.producer", {
+ topic: message.topic,
+ exception: [e.class.to_s, e.message],
+ })
+
+ failed_messages << message
end
+ end
- @buffer.write(
- value: message.value,
- key: message.key,
- topic: message.topic,
- partition: partition,
- create_time: message.create_time,
- )
+ if failed_messages.any?
+ @logger.error "Failed to assign partitions to #{failed_messages.count} messages"
+ @cluster.mark_as_stale!
end
- rescue Kafka::Error => e
- @logger.error "Failed to assign pending message to a partition: #{e}"
- @cluster.mark_as_stale!
+
+ @pending_message_queue.replace(failed_messages)
end
end
end