Sha256: e1bf626d132c37fa1379c57ceef8150ec35eeb1df07ab1c7c162a71924d2cd5c

Contents?: true

Size: 1.66 KB

Versions: 34

Compression:

Stored size: 1.66 KB

Contents

# frozen_string_literal: true

module Karafka
  module Instrumentation
    module Callbacks
      # Statistics callback handler
      # @see `WaterDrop::Instrumentation::Callbacks::Statistics` for details on why we decorate
      #   those statistics
      class Statistics
        # @param subscription_group_id [String] id of the current subscription group
        # @param consumer_group_id [String] id of the current consumer group
        # @param client_name [String] rdkafka client name
        # @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using
        def initialize(subscription_group_id, consumer_group_id, client_name, monitor)
          @subscription_group_id = subscription_group_id
          @consumer_group_id = consumer_group_id
          @client_name = client_name
          @monitor = monitor
          @statistics_decorator = ::Karafka::Core::Monitoring::StatisticsDecorator.new
        end

        # Emits decorated statistics to the monitor
        # @param statistics [Hash] rdkafka statistics
        def call(statistics)
          # Emit only statistics related to our client
          # rdkafka does not have per-instance statistics hook, thus we need to make sure that we
          # emit only stats that are related to current producer. Otherwise we would emit all of
          # all the time.
          return unless @client_name == statistics['name']

          @monitor.instrument(
            'statistics.emitted',
            subscription_group_id: @subscription_group_id,
            consumer_group_id: @consumer_group_id,
            statistics: @statistics_decorator.call(statistics)
          )
        end
      end
    end
  end
end

Version data entries

34 entries across 34 versions & 1 rubygems

Version Path
karafka-2.0.32 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.31 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.30 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.29 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.28 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.27 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.26 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.24 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.23 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.22 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.21 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.20 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.19 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.18 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.17 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.16 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.15 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.14 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.13 lib/karafka/instrumentation/callbacks/statistics.rb
karafka-2.0.12 lib/karafka/instrumentation/callbacks/statistics.rb