Sha256: 53179581f23f5f5dd99c32a7cde36cf9207d534732896e942fc876bc249e9eb5

Contents?: true

Size: 1.84 KB

Versions: 16

Compression:

Stored size: 1.84 KB

Contents

# frozen_string_literal: true

module Karafka
  module Instrumentation
    # Callbacks used to transport things from rdkafka
    module Callbacks
      # Callback that kicks in when consumer error occurs and is published in a background thread
      class Error
        include Helpers::ConfigImporter.new(
          monitor: %i[monitor]
        )

        # @param subscription_group_id [String] id of the current subscription group instance
        # @param consumer_group_id [String] id of the current consumer group
        # @param client_name [String] rdkafka client name
        def initialize(subscription_group_id, consumer_group_id, client_name)
          @subscription_group_id = subscription_group_id
          @consumer_group_id = consumer_group_id
          @client_name = client_name
        end

        # Runs the instrumentation monitor with error
        # @param client_name [String] rdkafka client name
        # @param error [Rdkafka::Error] error that occurred
        # @note It will only instrument on errors of the client of our consumer
        def call(client_name, error)
          # Emit only errors related to our client
          # Same as with statistics (mor explanation there)
          return unless @client_name == client_name

          monitor.instrument(
            'error.occurred',
            caller: self,
            subscription_group_id: @subscription_group_id,
            consumer_group_id: @consumer_group_id,
            type: 'librdkafka.error',
            error: error
          )
        rescue StandardError => e
          monitor.instrument(
            'error.occurred',
            caller: self,
            subscription_group_id: @subscription_group_id,
            consumer_group_id: @consumer_group_id,
            type: 'callbacks.error.error',
            error: e
          )
        end
      end
    end
  end
end

Version data entries

16 entries across 16 versions & 1 rubygems

Version Path
karafka-2.4.17 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.16 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.15 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.14 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.13 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.12 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.11 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.10 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.9 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.8 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.7 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.6 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.5 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.4 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.3 lib/karafka/instrumentation/callbacks/error.rb
karafka-2.4.0 lib/karafka/instrumentation/callbacks/error.rb