Sha256: 3bc2461203f367335a9a686dc4d0bb10a5259548dde7b7064e8e52111d52821c

Contents?: true

Size: 1.53 KB

Versions: 3

Compression:

Stored size: 1.53 KB

Contents

module NulogyMessageBusConsumer
  module Steps
    class LogConsumerLag
      def initialize(logger)
        @logger = logger
      end

      def call(kafka_consumer:, **_)
        # Ensure that the process is terminated if there is a problem getting the consumption lag.
        # This also ensures that the process will terminate on-boot if it cannot connect to Kafka,
        # allowing the container to be terminated by ECS.
        Thread.abort_on_exception = true

        Thread.new do
          # Delayed start. If we attempt to read consumer#committed immediately, it may fail.
          # We suspect this is because the consumer#committed is called before the consumer
          # has finished connecting. There appears to be a race condition.
          KafkaUtils.wait_for_assignment(kafka_consumer)

          loop do
            lag_per_topic = kafka_consumer.lag(kafka_consumer.committed)

            @logger.info(JSON.dump({
              event: "consumer_lag",
              topics: Calculator.add_max_lag(lag_per_topic)
            }))
            $stdout.flush

            sleep 60
          end
        end

        yield
      end

      module Calculator
        def self.add_max_lag(lag_by_topic)
          lag_by_topic.each_value do |lag_by_partition|
            lag_by_partition[:_max] = lag_by_partition.values.max || 0
          end

          lag_by_topic[:_max] = lag_by_topic
            .map { |_topic, lag_by_partition| lag_by_partition[:_max] }
            .max || 0

          lag_by_topic
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-0.5.0 lib/nulogy_message_bus_consumer/steps/log_consumer_lag.rb
nulogy_message_bus_consumer-1.0.0.alpha lib/nulogy_message_bus_consumer/steps/log_consumer_lag.rb
nulogy_message_bus_consumer-0.4.0 lib/nulogy_message_bus_consumer/steps/log_consumer_lag.rb