Sha256: ddcdbbb8c249c492649dd8143f398762e74c761cea9c0c66e48002985c6066eb

Contents?: true

Size: 1.56 KB

Versions: 7

Compression:

Stored size: 1.56 KB

Contents

# frozen_string_literal: true

module Sbmt
  module KafkaConsumer
    module Instrumentation
      class LoggerListener < Karafka::Instrumentation::LoggerListener
        include ListenerHelper
        CUSTOM_ERROR_TYPES = %w[consumer.base.consume_one consumer.inbox.consume_one].freeze

        def on_error_occurred(event)
          type = event[:type]
          error = event[:error]

          # catch here only consumer-specific errors
          # and let default handler to process other
          return super unless CUSTOM_ERROR_TYPES.include?(type)

          tags = {}
          tags[:status] = event[:status] if type == "consumer.inbox.consume_one"

          logger.tagged(
            type: type,
            **tags
          ) do
            logger.error(error_message(error))
            log_backtrace(error)
          end
        end

        # BaseConsumer events
        def on_consumer_consumed_one(event)
          logger.info("Successfully consumed message in #{event.payload[:time]} ms")
        end

        def on_consumer_mark_as_consumed(event)
          logger.info("Processing message in #{event.payload[:time]} ms")
        end

        def on_consumer_process_message(event)
          logger.info("Commit offset in #{event.payload[:time]} ms")
        end

        # InboxConsumer events
        def on_consumer_inbox_consumed_one(event)
          logger.tagged(status: event[:status]) do
            logger.info("Successfully consumed message with uuid: #{event[:message_uuid]} in #{event.payload[:time]} ms")
          end
        end
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
sbmt-kafka_consumer-3.1.0 lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb
sbmt-kafka_consumer-2.8.0 lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb
sbmt-kafka_consumer-3.0.0 lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb
sbmt-kafka_consumer-2.7.1 lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb
sbmt-kafka_consumer-2.7.0 lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb
sbmt-kafka_consumer-2.6.1 lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb
sbmt-kafka_consumer-2.6.0 lib/sbmt/kafka_consumer/instrumentation/logger_listener.rb