Sha256: 554a449e6d2d9184b450c26bb8d7e143ebeaa77e42457ca60c526b9e0c8262c4

Contents?: true

Size: 1.26 KB

Versions: 7

Compression:

Stored size: 1.26 KB

Contents

require "set"

module NulogyMessageBusConsumer
  # Keeps track of how many times a topic's partition has not changed (non-zero) lag between update calls.
  class LagTracker
    attr_reader :failing_checks

    def initialize(failing_checks: 3)
      @failing_checks = failing_checks
      @tracked = Hash.new { |h, topic| h[topic] = {} }
      @failed = Hash.new { |h, topic| h[topic] = Set.new }
    end

    def update(topic_partitions)
      topic_partitions.each_pair do |topic, partitions|
        partitions.each_pair do |partition, value|
          update_topic_partition(topic, partition, value)
        end
      end
    end

    def failing?
      @failed.any?
    end

    def failed
      @failed.transform_values { |v| v.to_a.sort }
    end

    private

    def update_topic_partition(topic, partition, value)
      current_value, count = @tracked.dig(topic, partition)

      new_value, new_count =
        if current_value == value && !value.zero?
          [current_value, count + 1]
        else
          [value, 0]
        end

      @tracked[topic][partition] = [new_value, new_count]

      if new_count >= @failing_checks
        @failed[topic] << partition
      end
    end

    def exists?(topic, partition)
      @tracked.dig(topic, partition)
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-3.0.0 lib/nulogy_message_bus_consumer/lag_tracker.rb
nulogy_message_bus_consumer-2.0.1 lib/nulogy_message_bus_consumer/lag_tracker.rb
nulogy_message_bus_consumer-2.0.0 lib/nulogy_message_bus_consumer/lag_tracker.rb
nulogy_message_bus_consumer-1.0.0 lib/nulogy_message_bus_consumer/lag_tracker.rb
nulogy_message_bus_consumer-0.5.0 lib/nulogy_message_bus_consumer/lag_tracker.rb
nulogy_message_bus_consumer-1.0.0.alpha lib/nulogy_message_bus_consumer/lag_tracker.rb
nulogy_message_bus_consumer-0.4.0 lib/nulogy_message_bus_consumer/lag_tracker.rb