lib/kafka/fetcher.rb in ruby-kafka-0.7.0.beta1 vs lib/kafka/fetcher.rb in ruby-kafka-0.7.0.beta2
- old
+ new
@@ -4,15 +4,16 @@
module Kafka
class Fetcher
attr_reader :queue
- def initialize(cluster:, logger:, instrumenter:, max_queue_size:)
+ def initialize(cluster:, logger:, instrumenter:, max_queue_size:, group:)
@cluster = cluster
@logger = logger
@instrumenter = instrumenter
@max_queue_size = max_queue_size
+ @group = group
@queue = Queue.new
@commands = Queue.new
@next_offsets = Hash.new { |h, k| h[k] = {} }
@@ -120,10 +121,15 @@
@logger.info "Will fetch at most #{max_bytes_per_partition} bytes at a time per partition from #{topic}"
@max_bytes_per_partition[topic] = max_bytes_per_partition
end
def handle_seek(topic, partition, offset)
+ @instrumenter.instrument('seek.consumer',
+ group_id: @group.group_id,
+ topic: topic,
+ partition: partition,
+ offset: offset)
@logger.info "Seeking #{topic}/#{partition} to offset #{offset}"
@next_offsets[topic][partition] = offset
end
def step
@@ -136,12 +142,12 @@
partition: batch.partition,
offset_lag: batch.offset_lag,
highwater_mark_offset: batch.highwater_mark_offset,
message_count: batch.messages.count,
})
- end
- @next_offsets[batch.topic][batch.partition] = batch.last_offset + 1
+ @next_offsets[batch.topic][batch.partition] = batch.last_offset + 1
+ end
end
@queue << [:batches, batches]
rescue Kafka::NoPartitionsToFetchFrom
@logger.warn "No partitions to fetch from, sleeping for 1s"