Sha256: ddcdbbb8c249c492649dd8143f398762e74c761cea9c0c66e48002985c6066eb
Contents?: true
Size: 1.56 KB
Versions: 7
Compression:
Stored size: 1.56 KB
Contents
# frozen_string_literal: true module Sbmt module KafkaConsumer module Instrumentation class LoggerListener < Karafka::Instrumentation::LoggerListener include ListenerHelper CUSTOM_ERROR_TYPES = %w[consumer.base.consume_one consumer.inbox.consume_one].freeze def on_error_occurred(event) type = event[:type] error = event[:error] # catch here only consumer-specific errors # and let default handler to process other return super unless CUSTOM_ERROR_TYPES.include?(type) tags = {} tags[:status] = event[:status] if type == "consumer.inbox.consume_one" logger.tagged( type: type, **tags ) do logger.error(error_message(error)) log_backtrace(error) end end # BaseConsumer events def on_consumer_consumed_one(event) logger.info("Successfully consumed message in #{event.payload[:time]} ms") end def on_consumer_mark_as_consumed(event) logger.info("Processing message in #{event.payload[:time]} ms") end def on_consumer_process_message(event) logger.info("Commit offset in #{event.payload[:time]} ms") end # InboxConsumer events def on_consumer_inbox_consumed_one(event) logger.tagged(status: event[:status]) do logger.info("Successfully consumed message with uuid: #{event[:message_uuid]} in #{event.payload[:time]} ms") end end end end end end
Version data entries
7 entries across 7 versions & 1 rubygems