Sha256: 8651915dd8e8826a788653dde962979a7a621f5c6d54249dc14635a69516d978
Contents?: true
Size: 1.3 KB
Versions: 8
Compression:
Stored size: 1.3 KB
Contents
# frozen_string_literal: true module Sbmt module KafkaConsumer module Instrumentation class LoggerListener < Karafka::Instrumentation::LoggerListener include ListenerHelper CUSTOM_ERROR_TYPES = %w[consumer.base.consume_one consumer.inbox.consume_one].freeze def on_error_occurred(event) type = event[:type] error = event[:error] # catch here only consumer-specific errors # and let default handler to process other return super unless CUSTOM_ERROR_TYPES.include?(type) tags = {} tags[:status] = event[:status] if type == "consumer.inbox.consume_one" logger.tagged( type: type, **tags ) do logger.error(error_message(error)) log_backtrace(error) end end # BaseConsumer events def on_consumer_consumed_one(event) logger.info("Successfully consumed message in #{event.payload[:time]} ms") end # InboxConsumer events def on_consumer_inbox_consumed_one(event) logger.tagged(status: event[:status]) do logger.info("Successfully consumed message with uuid: #{event[:message_uuid]} in #{event.payload[:time]} ms") end end end end end end
Version data entries
8 entries across 8 versions & 1 rubygems