lib/messaging/adapters/postgres/consumer.rb in messaging-4.0.2.pre vs lib/messaging/adapters/postgres/consumer.rb in messaging-4.0.3.pre
- old
+ new
@@ -42,10 +42,11 @@
save
end
def process_messages
while @running do
+ Meter.histogram('messaging.consumer.lag', unprocessed_messages_count, tags: { consumer: name })
messages = fetch_messages
messages.each do |message|
process_message(message)
end
save if messages.any?
@@ -60,9 +61,13 @@
end
def pause_polling_for_5_s
Messaging.logger.debug "[#{name}] No new messages. Sleeping"
sleep 5
+ end
+
+ def unprocessed_messages_count
+ fetch_messages.count
end
def fetch_messages
SerializedMessage.in_category(categories)
.with_transaction_id_lower_than_any_currently_running_transaction