lib/waterdrop/instrumentation/callbacks/delivery.rb in waterdrop-2.4.11 vs lib/waterdrop/instrumentation/callbacks/delivery.rb in waterdrop-2.5.0
- old
+ new
@@ -15,9 +15,33 @@
end
# Emits delivery details to the monitor
# @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
def call(delivery_report)
+ if delivery_report.error.to_i.positive?
+ instrument_error(delivery_report)
+ else
+ instrument_acknowledged(delivery_report)
+ end
+ end
+
+ private
+
+ # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
+ def instrument_error(delivery_report)
+ @monitor.instrument(
+ 'error.occurred',
+ caller: self,
+ error: ::Rdkafka::RdkafkaError.new(delivery_report.error),
+ producer_id: @producer_id,
+ offset: delivery_report.offset,
+ partition: delivery_report.partition,
+ type: 'librdkafka.dispatch_error'
+ )
+ end
+
+ # @param delivery_report [Rdkafka::Producer::DeliveryReport] delivery report
+ def instrument_acknowledged(delivery_report)
@monitor.instrument(
'message.acknowledged',
producer_id: @producer_id,
offset: delivery_report.offset,
partition: delivery_report.partition