lib/kafka/datadog.rb in ruby-kafka-0.5.1.beta1 vs lib/kafka/datadog.rb in ruby-kafka-0.5.1.beta2
- old
+ new
@@ -120,11 +120,13 @@
attach_to "connection.kafka"
end
class ConsumerSubscriber < StatsdSubscriber
def process_message(event)
- lag = event.payload.fetch(:offset_lag)
+ offset_lag = event.payload.fetch(:offset_lag)
+ create_time = event.payload.fetch(:create_time)
+ time_lag = create_time && ((Time.now - create_time) * 1000).to_i
tags = {
client: event.payload.fetch(:client_id),
group_id: event.payload.fetch(:group_id),
topic: event.payload.fetch(:topic),
@@ -136,10 +138,15 @@
else
timing("consumer.process_message.latency", event.duration, tags: tags)
increment("consumer.messages", tags: tags)
end
- gauge("consumer.lag", lag, tags: tags)
+ gauge("consumer.lag", offset_lag, tags: tags)
+
+ # Not all messages have timestamps.
+ if time_lag
+ gauge("consumer.time_lag", time_lag, tags: tags)
+ end
end
def process_batch(event)
lag = event.payload.fetch(:offset_lag)
messages = event.payload.fetch(:message_count)