Sha256: 554a449e6d2d9184b450c26bb8d7e143ebeaa77e42457ca60c526b9e0c8262c4
Contents?: true
Size: 1.26 KB
Versions: 7
Compression:
Stored size: 1.26 KB
Contents
require "set" module NulogyMessageBusConsumer # Keeps track of how many times a topic's partition has not changed (non-zero) lag between update calls. class LagTracker attr_reader :failing_checks def initialize(failing_checks: 3) @failing_checks = failing_checks @tracked = Hash.new { |h, topic| h[topic] = {} } @failed = Hash.new { |h, topic| h[topic] = Set.new } end def update(topic_partitions) topic_partitions.each_pair do |topic, partitions| partitions.each_pair do |partition, value| update_topic_partition(topic, partition, value) end end end def failing? @failed.any? end def failed @failed.transform_values { |v| v.to_a.sort } end private def update_topic_partition(topic, partition, value) current_value, count = @tracked.dig(topic, partition) new_value, new_count = if current_value == value && !value.zero? [current_value, count + 1] else [value, 0] end @tracked[topic][partition] = [new_value, new_count] if new_count >= @failing_checks @failed[topic] << partition end end def exists?(topic, partition) @tracked.dig(topic, partition) end end end
Version data entries
7 entries across 7 versions & 1 rubygems