lib/deimos/instrumentation.rb in deimos-ruby-1.23.2 vs lib/deimos/instrumentation.rb in deimos-ruby-1.23.3

- old
+ new

@@ -41,40 +41,45 @@ include Instrumentation # This module listens to events published by RubyKafka. module KafkaListener + # @param exception [Exception] + def self.handle_exception_with_messages(exception) + messages = exception.failed_messages + messages.group_by(&:topic).each do |topic, batch| + producer = Deimos::Producer.descendants.find { |c| c.topic == topic } + next if batch.empty? || !producer + + decoder = Deimos.schema_backend(schema: producer.config[:schema], + namespace: producer.config[:namespace]) + payloads = batch.map { |m| decoder.decode(m.value) } + + Deimos.config.metrics&.increment( + 'publish_error', + tags: %W(topic:#{topic}), + by: payloads.size + ) + Deimos.instrument( + 'produce_error', + producer: producer, + topic: topic, + exception_object: exception, + payloads: payloads + ) + end + end + # Listens for any exceptions that happen during publishing and re-publishes # as a Deimos event. # @param event [ActiveSupport::Notifications::Event] # @return [void] def self.send_produce_error(event) exception = event.payload[:exception_object] return unless exception if exception.respond_to?(:failed_messages) - messages = exception.failed_messages - messages.group_by(&:topic).each do |topic, batch| - producer = Deimos::Producer.descendants.find { |c| c.topic == topic } - next if batch.empty? || !producer - - decoder = Deimos.schema_backend(schema: producer.config[:schema], - namespace: producer.config[:namespace]) - payloads = batch.map { |m| decoder.decode(m.value) } - - Deimos.config.metrics&.increment( - 'publish_error', - tags: %W(topic:#{topic}), - by: payloads.size - ) - Deimos.instrument( - 'produce_error', - producer: producer, - topic: topic, - exception_object: exception, - payloads: payloads - ) - end + handle_exception_with_messages(exception) else Deimos.config.metrics&.increment( 'publish_error', by: event.payload[:message_count] || 1 )