lib/rdkafka/consumer.rb in karafka-rdkafka-0.13.10 vs lib/rdkafka/consumer.rb in karafka-rdkafka-0.14.0.beta1
- old
+ new
@@ -298,11 +298,11 @@
# @param topic [String] The topic to query
# @param partition [Integer] The partition to query
# @param timeout_ms [Integer] The timeout for querying the broker
# @return [Integer] The low and high watermark
# @raise [RdkafkaError] When querying the broker fails.
- def query_watermark_offsets(topic, partition, timeout_ms=200)
+ def query_watermark_offsets(topic, partition, timeout_ms=1000)
closed_consumer_check(__method__)
low = FFI::MemoryPointer.new(:int64, 1)
high = FFI::MemoryPointer.new(:int64, 1)
@@ -333,10 +333,10 @@
# @param topic_partition_list [TopicPartitionList] The list to calculate lag for.
# @param watermark_timeout_ms [Integer] The timeout for each query watermark call.
# @return [Hash<String, Hash<Integer, Integer>>] A hash containing all topics with the lag
# per partition
# @raise [RdkafkaError] When querying the broker fails.
- def lag(topic_partition_list, watermark_timeout_ms=100)
+ def lag(topic_partition_list, watermark_timeout_ms=1000)
out = {}
topic_partition_list.to_h.each do |topic, partitions|
# Query high watermarks for this topic's partitions
# and compare to the offset in the list.