lib/deimos/utils/db_producer.rb in deimos-ruby-1.7.0.pre.beta1 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.8.0.pre.beta1
- old
+ new
@@ -85,13 +85,13 @@
log_messages(compacted_messages)
Deimos.instrument('db_producer.produce', topic: @current_topic, messages: compacted_messages) do
begin
produce_messages(compacted_messages.map(&:phobos_message))
rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge
- Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all
@logger.error('Message batch too large, deleting...')
@logger.error(Deimos::KafkaMessage.decoded(messages))
+ Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all
raise
end
end
Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all
Deimos.config.metrics&.increment(
@@ -131,10 +131,14 @@
group(:topic)
if messages.none?
metrics.gauge('pending_db_messages_max_wait', 0)
end
messages.each do |record|
- time_diff = Time.zone.now - record.earliest
+ earliest = record.earliest
+ # SQLite gives a string here
+ earliest = Time.zone.parse(earliest) if earliest.is_a?(String)
+
+ time_diff = Time.zone.now - earliest
metrics.gauge('pending_db_messages_max_wait', time_diff,
tags: ["topic:#{record.topic}"])
end
end