Sha256: c6b7f2cbb1a56e60b8036d6d6f4bf50392736690891bfd1448e7673cde6119c1
Contents?: true
Size: 1.97 KB
Versions: 2
Compression:
Stored size: 1.97 KB
Contents
# frozen_string_literal: true module WaterDrop module Instrumentation module Callbacks # Creates a callable that we want to run upon each message delivery or failure # # @note We don't have to provide client_name here as this callback is per client instance class Delivery # @param producer_id [String] id of the current producer # @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using def initialize(producer_id, monitor) @producer_id = producer_id @monitor = monitor end # Emits delivery details to the monitor # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report def call(delivery_report) if delivery_report.error.to_i.zero? instrument_acknowledged(delivery_report) else instrument_error(delivery_report) end end private # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report def instrument_error(delivery_report) @monitor.instrument( 'error.occurred', caller: self, error: ::Rdkafka::RdkafkaError.new(delivery_report.error), producer_id: @producer_id, offset: delivery_report.offset, partition: delivery_report.partition, topic: delivery_report.topic_name, delivery_report: delivery_report, type: 'librdkafka.dispatch_error' ) end # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report def instrument_acknowledged(delivery_report) @monitor.instrument( 'message.acknowledged', producer_id: @producer_id, offset: delivery_report.offset, partition: delivery_report.partition, topic: delivery_report.topic_name, delivery_report: delivery_report ) end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
waterdrop-2.6.9 | lib/waterdrop/instrumentation/callbacks/delivery.rb |
waterdrop-2.6.8 | lib/waterdrop/instrumentation/callbacks/delivery.rb |