Sha256: 3bc2461203f367335a9a686dc4d0bb10a5259548dde7b7064e8e52111d52821c
Contents?: true
Size: 1.53 KB
Versions: 3
Compression:
Stored size: 1.53 KB
Contents
module NulogyMessageBusConsumer module Steps class LogConsumerLag def initialize(logger) @logger = logger end def call(kafka_consumer:, **_) # Ensure that the process is terminated if there is a problem getting the consumption lag. # This also ensures that the process will terminate on-boot if it cannot connect to Kafka, # allowing the container to be terminated by ECS. Thread.abort_on_exception = true Thread.new do # Delayed start. If we attempt to read consumer#committed immediately, it may fail. # We suspect this is because the consumer#committed is called before the consumer # has finished connecting. There appears to be a race condition. KafkaUtils.wait_for_assignment(kafka_consumer) loop do lag_per_topic = kafka_consumer.lag(kafka_consumer.committed) @logger.info(JSON.dump({ event: "consumer_lag", topics: Calculator.add_max_lag(lag_per_topic) })) $stdout.flush sleep 60 end end yield end module Calculator def self.add_max_lag(lag_by_topic) lag_by_topic.each_value do |lag_by_partition| lag_by_partition[:_max] = lag_by_partition.values.max || 0 end lag_by_topic[:_max] = lag_by_topic .map { |_topic, lag_by_partition| lag_by_partition[:_max] } .max || 0 lag_by_topic end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems