Sha256: d893b610a6f2d725fdfa2e8044b3696d59858857173ffef9c5d75a7215c16cc3
Contents?: true
Size: 1.64 KB
Versions: 3
Compression:
Stored size: 1.64 KB
Contents
module NulogyMessageBusConsumer module Tasks class LogConsumerLag attr_reader :interval def initialize(logger, interval_seconds, lag_timeout) @logger = logger @interval = interval_seconds @lag_timeout = lag_timeout end def extract_args(kafka_consumer:, **_) @kafka_consumer = kafka_consumer end def call # Delayed start. If we attempt to read consumer#committed immediately, it may fail. # We suspect this is because the consumer#committed is called before the consumer # has finished connecting. There appears to be a race condition. KafkaUtils.wait_for_assignment(@kafka_consumer) # Note: consumer#committed has a timeout of 1200ms. To respect our lag_timeout, use the largest. committed_timeout = [1200, @lag_timeout].max # The first parameter is a TopicPartitionList. When nil, it uses all the assigned ones. committed_offsets = @kafka_consumer.committed(nil, committed_timeout) lag_per_topic = @kafka_consumer.lag(committed_offsets, @lag_timeout) @logger.info(JSON.dump({ event: "consumer_lag", topics: Calculator.add_max_lag(lag_per_topic) })) $stdout.flush end module Calculator def self.add_max_lag(lag_by_topic) lag_by_topic.each_value do |lag_by_partition| lag_by_partition[:_max] = lag_by_partition.values.max || 0 end lag_by_topic[:_max] = lag_by_topic .map { |_topic, lag_by_partition| lag_by_partition[:_max] } .max || 0 lag_by_topic end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems