lib/kafka/producer.rb in ruby-kafka-0.4.2 vs lib/kafka/producer.rb in ruby-kafka-0.4.3
- old
+ new
@@ -292,11 +292,15 @@
loop do
attempt += 1
notification[:attempts] = attempt
- @cluster.refresh_metadata_if_necessary!
+ begin
+ @cluster.refresh_metadata_if_necessary!
+ rescue ConnectionError => e
+ raise DeliveryFailed.new(e, buffer_messages)
+ end
assign_partitions!
operation.execute
if @required_acks.zero?
@@ -319,17 +323,17 @@
end
unless @pending_message_queue.empty?
# Mark the cluster as stale in order to force a cluster metadata refresh.
@cluster.mark_as_stale!
- raise DeliveryFailed, "Failed to assign partitions to #{@pending_message_queue.size} messages"
+ raise DeliveryFailed.new("Failed to assign partitions to #{@pending_message_queue.size} messages", buffer_messages)
end
unless @buffer.empty?
partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")
- raise DeliveryFailed, "Failed to send messages to #{partitions}"
+ raise DeliveryFailed.new("Failed to send messages to #{partitions}", buffer_messages)
end
end
def assign_partitions!
failed_messages = []
@@ -376,9 +380,32 @@
@cluster.mark_as_stale!
end
@pending_message_queue.replace(failed_messages)
+ end
+
+ def buffer_messages
+ messages = []
+
+ @pending_message_queue.each do |message|
+ messages << message
+ end
+
+ @buffer.each do |topic, partition, messages_for_partition|
+ messages_for_partition.each do |message|
+ messages << PendingMessage.new(
+ message.value,
+ message.key,
+ topic,
+ partition,
+ nil,
+ message.create_time
+ )
+ end
+ end
+
+ messages
end
def buffer_overflow(topic, message)
@instrumenter.instrument("buffer_overflow.producer", {
topic: topic,