lib/deimos/instrumentation.rb in deimos-ruby-1.23.0 vs lib/deimos/instrumentation.rb in deimos-ruby-1.23.1.pre.beta1
- old
+ new
@@ -47,31 +47,38 @@
# as a Deimos event.
# @param event [ActiveSupport::Notifications::Event]
# @return [void]
def self.send_produce_error(event)
exception = event.payload[:exception_object]
- return if !exception || !exception.respond_to?(:failed_messages)
+ return unless exception
- messages = exception.failed_messages
- messages.group_by(&:topic).each do |topic, batch|
- producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
- next if batch.empty? || !producer
+ if exception.respond_to?(:failed_messages)
+ messages = exception.failed_messages
+ messages.group_by(&:topic).each do |topic, batch|
+ producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
+ next if batch.empty? || !producer
- decoder = Deimos.schema_backend(schema: producer.config[:schema],
- namespace: producer.config[:namespace])
- payloads = batch.map { |m| decoder.decode(m.value) }
+ decoder = Deimos.schema_backend(schema: producer.config[:schema],
+ namespace: producer.config[:namespace])
+ payloads = batch.map { |m| decoder.decode(m.value) }
+ Deimos.config.metrics&.increment(
+ 'publish_error',
+ tags: %W(topic:#{topic}),
+ by: payloads.size
+ )
+ Deimos.instrument(
+ 'produce_error',
+ producer: producer,
+ topic: topic,
+ exception_object: exception,
+ payloads: payloads
+ )
+ end
+ else
Deimos.config.metrics&.increment(
'publish_error',
- tags: %W(topic:#{topic}),
- by: payloads.size
- )
- Deimos.instrument(
- 'produce_error',
- producer: producer,
- topic: topic,
- exception_object: exception,
- payloads: payloads
+ by: event.respond_to?(:[]) ? event[:message_count] : 1
)
end
end
end