lib/kafka/fetcher.rb in ruby-kafka-0.7.0 vs lib/kafka/fetcher.rb in ruby-kafka-0.7.1.beta1
- old
+ new
@@ -142,12 +142,12 @@
partition: batch.partition,
offset_lag: batch.offset_lag,
highwater_mark_offset: batch.highwater_mark_offset,
message_count: batch.messages.count,
})
-
- @next_offsets[batch.topic][batch.partition] = batch.last_offset + 1
end
+
+ @next_offsets[batch.topic][batch.partition] = batch.last_offset + 1 unless batch.unknown_last_offset?
end
@queue << [:batches, batches]
rescue Kafka::NoPartitionsToFetchFrom
@logger.warn "No partitions to fetch from, sleeping for 1s"