lib/kafka/fetcher.rb in ruby-kafka-0.7.1.beta1 vs lib/kafka/fetcher.rb in ruby-kafka-0.7.1.beta2
- old
+ new
@@ -176,9 +176,19 @@
operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes)
end
end
operation.execute
+ rescue UnknownTopicOrPartition
+ @logger.error "Failed to fetch from some partitions. Maybe a rebalance has happened? Refreshing cluster info."
+
+ # Our cluster information has become stale, we need to refresh it.
+ @cluster.refresh_metadata!
+
+ # Don't overwhelm the brokers in case this keeps happening.
+ sleep 10
+
+ retry
rescue NoPartitionsToFetchFrom
backoff = @max_wait_time > 0 ? @max_wait_time : 1
@logger.info "There are no partitions to fetch from, sleeping for #{backoff}s"
sleep backoff