lib/kafka/fetcher.rb in ruby-kafka-0.7.0.beta1 vs lib/kafka/fetcher.rb in ruby-kafka-0.7.0.beta2

- old
+ new

@@ -4,15 +4,16 @@ module Kafka class Fetcher attr_reader :queue - def initialize(cluster:, logger:, instrumenter:, max_queue_size:) + def initialize(cluster:, logger:, instrumenter:, max_queue_size:, group:) @cluster = cluster @logger = logger @instrumenter = instrumenter @max_queue_size = max_queue_size + @group = group @queue = Queue.new @commands = Queue.new @next_offsets = Hash.new { |h, k| h[k] = {} } @@ -120,10 +121,15 @@ @logger.info "Will fetch at most #{max_bytes_per_partition} bytes at a time per partition from #{topic}" @max_bytes_per_partition[topic] = max_bytes_per_partition end def handle_seek(topic, partition, offset) + @instrumenter.instrument('seek.consumer', + group_id: @group.group_id, + topic: topic, + partition: partition, + offset: offset) @logger.info "Seeking #{topic}/#{partition} to offset #{offset}" @next_offsets[topic][partition] = offset end def step @@ -136,12 +142,12 @@ partition: batch.partition, offset_lag: batch.offset_lag, highwater_mark_offset: batch.highwater_mark_offset, message_count: batch.messages.count, }) - end - @next_offsets[batch.topic][batch.partition] = batch.last_offset + 1 + @next_offsets[batch.topic][batch.partition] = batch.last_offset + 1 + end end @queue << [:batches, batches] rescue Kafka::NoPartitionsToFetchFrom @logger.warn "No partitions to fetch from, sleeping for 1s"