lib/waterdrop/instrumentation/callbacks/delivery.rb in waterdrop-2.6.9 vs lib/waterdrop/instrumentation/callbacks/delivery.rb in waterdrop-2.6.10

- old
+ new

@@ -4,54 +4,98 @@ module Instrumentation module Callbacks # Creates a callable that we want to run upon each message delivery or failure # # @note We don't have to provide client_name here as this callback is per client instance + # + # @note We do not consider `message.purge` as an error for transactional producers, because + # this is a standard behaviour for not yet dispatched messages on aborted transactions. + # We do however still want to instrument it for traceability. class Delivery + # Error emitted when a message was not yet dispatched and was purged from the queue + RD_KAFKA_RESP_PURGE_QUEUE = -152 + + # Error emitted when a message was purged while it was dispatched + RD_KAFKA_RESP_PURGE_INFLIGHT = -151 + + # Errors related to queue purging that is expected in transactions + PURGE_ERRORS = [RD_KAFKA_RESP_PURGE_INFLIGHT, RD_KAFKA_RESP_PURGE_QUEUE].freeze + + private_constant :RD_KAFKA_RESP_PURGE_QUEUE, :RD_KAFKA_RESP_PURGE_INFLIGHT, :PURGE_ERRORS + # @param producer_id [String] id of the current producer + # @param transactional [Boolean] is this handle for a transactional or regular producer # @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using - def initialize(producer_id, monitor) + def initialize(producer_id, transactional, monitor) @producer_id = producer_id + @transactional = transactional @monitor = monitor end # Emits delivery details to the monitor # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report def call(delivery_report) - if delivery_report.error.to_i.zero? + error_code = delivery_report.error.to_i + + if error_code.zero? instrument_acknowledged(delivery_report) + + elsif @transactional && PURGE_ERRORS.include?(error_code) + instrument_purged(delivery_report) else instrument_error(delivery_report) end end private # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report - def instrument_error(delivery_report) + def instrument_acknowledged(delivery_report) @monitor.instrument( - 'error.occurred', + 'message.acknowledged', caller: self, - error: ::Rdkafka::RdkafkaError.new(delivery_report.error), producer_id: @producer_id, offset: delivery_report.offset, partition: delivery_report.partition, topic: delivery_report.topic_name, - delivery_report: delivery_report, - type: 'librdkafka.dispatch_error' + delivery_report: delivery_report ) end # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report - def instrument_acknowledged(delivery_report) + def instrument_purged(delivery_report) @monitor.instrument( - 'message.acknowledged', + 'message.purged', + caller: self, + error: build_error(delivery_report), producer_id: @producer_id, offset: delivery_report.offset, partition: delivery_report.partition, topic: delivery_report.topic_name, delivery_report: delivery_report ) + end + + # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report + def instrument_error(delivery_report) + @monitor.instrument( + 'error.occurred', + caller: self, + error: build_error(delivery_report), + producer_id: @producer_id, + offset: delivery_report.offset, + partition: delivery_report.partition, + topic: delivery_report.topic_name, + delivery_report: delivery_report, + type: 'librdkafka.dispatch_error' + ) + end + + # Builds appropriate rdkafka error + # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report + # @return [::Rdkafka::RdkafkaError] + def build_error(delivery_report) + ::Rdkafka::RdkafkaError.new(delivery_report.error) end end end end end