lib/waterdrop/instrumentation/callbacks/delivery.rb in waterdrop-2.4.11 vs lib/waterdrop/instrumentation/callbacks/delivery.rb in waterdrop-2.5.0

- old
+ new

@@ -15,9 +15,33 @@ end # Emits delivery details to the monitor # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report def call(delivery_report) + if delivery_report.error.to_i.positive? + instrument_error(delivery_report) + else + instrument_acknowledged(delivery_report) + end + end + + private + + # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report + def instrument_error(delivery_report) + @monitor.instrument( + 'error.occurred', + caller: self, + error: ::Rdkafka::RdkafkaError.new(delivery_report.error), + producer_id: @producer_id, + offset: delivery_report.offset, + partition: delivery_report.partition, + type: 'librdkafka.dispatch_error' + ) + end + + # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report + def instrument_acknowledged(delivery_report) @monitor.instrument( 'message.acknowledged', producer_id: @producer_id, offset: delivery_report.offset, partition: delivery_report.partition