module NulogyMessageBusConsumer module Tasks # Supervises the consumer's lag. # # If a partition's lag is non-zero and does not change for an extended period # of time, then kill the main thread. # # That period of time is check_interval_seconds * LagTracker#failing_checks # With the defaults, that would be 20 * 6 ~ 120 seconds = 2 minutes. # # Note that this strategy may not work for a busy integration. # Consumer lag monitoring should alert in that case. # However, this strategy may help alleviate alerts for low traffic or off-peak # environments. # # We've come across cases where the consumer lag is still being logged, # messages are being processed, but the consumer is not consuming messages # in particular topics. # # Killing the main thread causes ECS to restart the task. class SuperviseConsumerLag attr_reader :interval def initialize(logger, tracker: NulogyMessageBusConsumer::LagTracker.new(failing_checks: 6), killable: nil, check_interval_seconds: 20) @logger = logger @tracker = tracker @killable = killable @interval = check_interval_seconds end def extract_args(kafka_consumer:, **_) @consumer = kafka_consumer @killable ||= Thread.current end def call NulogyMessageBusConsumer::KafkaUtils.wait_for_assignment(@consumer) @tracker.update(@consumer.lag(@consumer.committed)) if @tracker.failing? log_failed_partitions @killable.kill Thread.current.exit end end private def log_failed_partitions seconds = @interval * @tracker.failing_checks failed = @tracker .failed .map { |topic, partitions| "#{topic}: #{partitions.join(",")}" } .join(", ") @logger.warn(JSON.dump({ event: "message_processing_warning", message: "Assigned partition lag has not changed in #{seconds} seconds: #{failed}" })) $stdout.flush end end end end