Sha256: dacef660f8058d097820ee78b23d4620b5e2df112b8c6a79bfdcc3a3847a15b6

Contents?: true

Size: 1.88 KB

Versions: 10

Compression:

Stored size: 1.88 KB

Contents

# frozen_string_literal: true

module WaterDrop
  module Instrumentation
    module Callbacks
      # Creates a callable that we want to run upon each message delivery or failure
      #
      # @note We don't have to provide client_name here as this callback is per client instance
      class Delivery
        # @param producer_id [String] id of the current producer
        # @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using
        def initialize(producer_id, monitor)
          @producer_id = producer_id
          @monitor = monitor
        end

        # Emits delivery details to the monitor
        # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
        def call(delivery_report)
          if delivery_report.error.to_i.positive?
            instrument_error(delivery_report)
          else
            instrument_acknowledged(delivery_report)
          end
        end

        private

        # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
        def instrument_error(delivery_report)
          @monitor.instrument(
            'error.occurred',
            caller: self,
            error: ::Rdkafka::RdkafkaError.new(delivery_report.error),
            producer_id: @producer_id,
            offset: delivery_report.offset,
            partition: delivery_report.partition,
            topic: delivery_report.topic_name,
            type: 'librdkafka.dispatch_error'
          )
        end

        # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
        def instrument_acknowledged(delivery_report)
          @monitor.instrument(
            'message.acknowledged',
            producer_id: @producer_id,
            offset: delivery_report.offset,
            partition: delivery_report.partition,
            topic: delivery_report.topic_name
          )
        end
      end
    end
  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
waterdrop-2.6.7 lib/waterdrop/instrumentation/callbacks/delivery.rb
waterdrop-2.6.6 lib/waterdrop/instrumentation/callbacks/delivery.rb
waterdrop-2.6.5 lib/waterdrop/instrumentation/callbacks/delivery.rb
waterdrop-2.6.4 lib/waterdrop/instrumentation/callbacks/delivery.rb
waterdrop-2.6.3 lib/waterdrop/instrumentation/callbacks/delivery.rb
waterdrop-2.6.2 lib/waterdrop/instrumentation/callbacks/delivery.rb
waterdrop-2.6.1 lib/waterdrop/instrumentation/callbacks/delivery.rb
waterdrop-2.6.1.beta1 lib/waterdrop/instrumentation/callbacks/delivery.rb
waterdrop-2.6.0 lib/waterdrop/instrumentation/callbacks/delivery.rb
waterdrop-2.5.3 lib/waterdrop/instrumentation/callbacks/delivery.rb