Sha256: d893b610a6f2d725fdfa2e8044b3696d59858857173ffef9c5d75a7215c16cc3

Contents?: true

Size: 1.64 KB

Versions: 3

Compression:

Stored size: 1.64 KB

Contents

module NulogyMessageBusConsumer
  module Tasks
    class LogConsumerLag
      attr_reader :interval

      def initialize(logger, interval_seconds, lag_timeout)
        @logger = logger
        @interval = interval_seconds
        @lag_timeout = lag_timeout
      end

      def extract_args(kafka_consumer:, **_)
        @kafka_consumer = kafka_consumer
      end

      def call
        # Delayed start. If we attempt to read consumer#committed immediately, it may fail.
        # We suspect this is because the consumer#committed is called before the consumer
        # has finished connecting. There appears to be a race condition.
        KafkaUtils.wait_for_assignment(@kafka_consumer)

        # Note: consumer#committed has a timeout of 1200ms. To respect our lag_timeout, use the largest.
        committed_timeout = [1200, @lag_timeout].max
        # The first parameter is a TopicPartitionList. When nil, it uses all the assigned ones.
        committed_offsets = @kafka_consumer.committed(nil, committed_timeout)

        lag_per_topic = @kafka_consumer.lag(committed_offsets, @lag_timeout)

        @logger.info(JSON.dump({
          event: "consumer_lag",
          topics: Calculator.add_max_lag(lag_per_topic)
        }))
        $stdout.flush
      end

      module Calculator
        def self.add_max_lag(lag_by_topic)
          lag_by_topic.each_value do |lag_by_partition|
            lag_by_partition[:_max] = lag_by_partition.values.max || 0
          end

          lag_by_topic[:_max] = lag_by_topic
            .map { |_topic, lag_by_partition| lag_by_partition[:_max] }
            .max || 0

          lag_by_topic
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-3.0.0 lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb
nulogy_message_bus_consumer-2.0.1 lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb
nulogy_message_bus_consumer-2.0.0 lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb