lib/karafka/instrumentation/callbacks/error.rb in karafka-2.4.0.rc1 vs lib/karafka/instrumentation/callbacks/error.rb in karafka-2.4.0
- old
+ new
@@ -4,10 +4,14 @@
module Instrumentation
# Callbacks used to transport things from rdkafka
module Callbacks
# Callback that kicks in when consumer error occurs and is published in a background thread
class Error
+ include Helpers::ConfigImporter.new(
+ monitor: %i[monitor]
+ )
+
# @param subscription_group_id [String] id of the current subscription group instance
# @param consumer_group_id [String] id of the current consumer group
# @param client_name [String] rdkafka client name
def initialize(subscription_group_id, consumer_group_id, client_name)
@subscription_group_id = subscription_group_id
@@ -22,16 +26,25 @@
def call(client_name, error)
# Emit only errors related to our client
# Same as with statistics (mor explanation there)
return unless @client_name == client_name
- ::Karafka.monitor.instrument(
+ monitor.instrument(
'error.occurred',
caller: self,
subscription_group_id: @subscription_group_id,
consumer_group_id: @consumer_group_id,
type: 'librdkafka.error',
error: error
+ )
+ rescue StandardError => e
+ monitor.instrument(
+ 'error.occurred',
+ caller: self,
+ subscription_group_id: @subscription_group_id,
+ consumer_group_id: @consumer_group_id,
+ type: 'callbacks.error.error',
+ error: e
)
end
end
end
end