lib/deimos/instrumentation.rb in deimos-ruby-1.23.0 vs lib/deimos/instrumentation.rb in deimos-ruby-1.23.1.pre.beta1

- old
+ new

@@ -47,31 +47,38 @@ # as a Deimos event. # @param event [ActiveSupport::Notifications::Event] # @return [void] def self.send_produce_error(event) exception = event.payload[:exception_object] - return if !exception || !exception.respond_to?(:failed_messages) + return unless exception - messages = exception.failed_messages - messages.group_by(&:topic).each do |topic, batch| - producer = Deimos::Producer.descendants.find { |c| c.topic == topic } - next if batch.empty? || !producer + if exception.respond_to?(:failed_messages) + messages = exception.failed_messages + messages.group_by(&:topic).each do |topic, batch| + producer = Deimos::Producer.descendants.find { |c| c.topic == topic } + next if batch.empty? || !producer - decoder = Deimos.schema_backend(schema: producer.config[:schema], - namespace: producer.config[:namespace]) - payloads = batch.map { |m| decoder.decode(m.value) } + decoder = Deimos.schema_backend(schema: producer.config[:schema], + namespace: producer.config[:namespace]) + payloads = batch.map { |m| decoder.decode(m.value) } + Deimos.config.metrics&.increment( + 'publish_error', + tags: %W(topic:#{topic}), + by: payloads.size + ) + Deimos.instrument( + 'produce_error', + producer: producer, + topic: topic, + exception_object: exception, + payloads: payloads + ) + end + else Deimos.config.metrics&.increment( 'publish_error', - tags: %W(topic:#{topic}), - by: payloads.size - ) - Deimos.instrument( - 'produce_error', - producer: producer, - topic: topic, - exception_object: exception, - payloads: payloads + by: event.respond_to?(:[]) ? event[:message_count] : 1 ) end end end