lib/kafka/producer.rb in ruby-kafka-0.4.2 vs lib/kafka/producer.rb in ruby-kafka-0.4.3

- old
+ new

@@ -292,11 +292,15 @@ loop do attempt += 1 notification[:attempts] = attempt - @cluster.refresh_metadata_if_necessary! + begin + @cluster.refresh_metadata_if_necessary! + rescue ConnectionError => e + raise DeliveryFailed.new(e, buffer_messages) + end assign_partitions! operation.execute if @required_acks.zero? @@ -319,17 +323,17 @@ end unless @pending_message_queue.empty? # Mark the cluster as stale in order to force a cluster metadata refresh. @cluster.mark_as_stale! - raise DeliveryFailed, "Failed to assign partitions to #{@pending_message_queue.size} messages" + raise DeliveryFailed.new("Failed to assign partitions to #{@pending_message_queue.size} messages", buffer_messages) end unless @buffer.empty? partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ") - raise DeliveryFailed, "Failed to send messages to #{partitions}" + raise DeliveryFailed.new("Failed to send messages to #{partitions}", buffer_messages) end end def assign_partitions! failed_messages = [] @@ -376,9 +380,32 @@ @cluster.mark_as_stale! end @pending_message_queue.replace(failed_messages) + end + + def buffer_messages + messages = [] + + @pending_message_queue.each do |message| + messages << message + end + + @buffer.each do |topic, partition, messages_for_partition| + messages_for_partition.each do |message| + messages << PendingMessage.new( + message.value, + message.key, + topic, + partition, + nil, + message.create_time + ) + end + end + + messages end def buffer_overflow(topic, message) @instrumenter.instrument("buffer_overflow.producer", { topic: topic,