lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.8.1 vs lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.8.2
- old
+ new
@@ -233,16 +233,16 @@
end
unless @pending_message_queue.empty?
# Mark the cluster as stale in order to force a cluster metadata refresh.
@cluster.mark_as_stale!
- raise DeliveryFailed, "Failed to assign partitions to #{@pending_message_queue.size} messages"
+ raise DeliveryFailed.new("Failed to assign partitions to #{@pending_message_queue.size} messages", buffer_messages)
end
unless @buffer.empty?
partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")
- raise DeliveryFailed, "Failed to send messages to #{partitions}"
+ raise DeliveryFailed.new("Failed to send messages to #{partitions}", buffer_messages)
end
end
def assign_partitions!
failed_messages = []