lib/kafka/fetcher.rb in ruby-kafka-0.7.1.beta1 vs lib/kafka/fetcher.rb in ruby-kafka-0.7.1.beta2

- old
+ new

@@ -176,9 +176,19 @@ operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes) end end operation.execute + rescue UnknownTopicOrPartition + @logger.error "Failed to fetch from some partitions. Maybe a rebalance has happened? Refreshing cluster info." + + # Our cluster information has become stale, we need to refresh it. + @cluster.refresh_metadata! + + # Don't overwhelm the brokers in case this keeps happening. + sleep 10 + + retry rescue NoPartitionsToFetchFrom backoff = @max_wait_time > 0 ? @max_wait_time : 1 @logger.info "There are no partitions to fetch from, sleeping for #{backoff}s" sleep backoff