lib/deimos/instrumentation.rb in deimos-ruby-1.23.2 vs lib/deimos/instrumentation.rb in deimos-ruby-1.23.3
- old
+ new
@@ -41,40 +41,45 @@
include Instrumentation
# This module listens to events published by RubyKafka.
module KafkaListener
+ # @param exception [Exception]
+ def self.handle_exception_with_messages(exception)
+ messages = exception.failed_messages
+ messages.group_by(&:topic).each do |topic, batch|
+ producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
+ next if batch.empty? || !producer
+
+ decoder = Deimos.schema_backend(schema: producer.config[:schema],
+ namespace: producer.config[:namespace])
+ payloads = batch.map { |m| decoder.decode(m.value) }
+
+ Deimos.config.metrics&.increment(
+ 'publish_error',
+ tags: %W(topic:#{topic}),
+ by: payloads.size
+ )
+ Deimos.instrument(
+ 'produce_error',
+ producer: producer,
+ topic: topic,
+ exception_object: exception,
+ payloads: payloads
+ )
+ end
+ end
+
# Listens for any exceptions that happen during publishing and re-publishes
# as a Deimos event.
# @param event [ActiveSupport::Notifications::Event]
# @return [void]
def self.send_produce_error(event)
exception = event.payload[:exception_object]
return unless exception
if exception.respond_to?(:failed_messages)
- messages = exception.failed_messages
- messages.group_by(&:topic).each do |topic, batch|
- producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
- next if batch.empty? || !producer
-
- decoder = Deimos.schema_backend(schema: producer.config[:schema],
- namespace: producer.config[:namespace])
- payloads = batch.map { |m| decoder.decode(m.value) }
-
- Deimos.config.metrics&.increment(
- 'publish_error',
- tags: %W(topic:#{topic}),
- by: payloads.size
- )
- Deimos.instrument(
- 'produce_error',
- producer: producer,
- topic: topic,
- exception_object: exception,
- payloads: payloads
- )
- end
+ handle_exception_with_messages(exception)
else
Deimos.config.metrics&.increment(
'publish_error',
by: event.payload[:message_count] || 1
)