lib/kafka/datadog.rb in ruby-kafka-0.5.1.beta1 vs lib/kafka/datadog.rb in ruby-kafka-0.5.1.beta2

- old
+ new

@@ -120,11 +120,13 @@ attach_to "connection.kafka" end class ConsumerSubscriber < StatsdSubscriber def process_message(event) - lag = event.payload.fetch(:offset_lag) + offset_lag = event.payload.fetch(:offset_lag) + create_time = event.payload.fetch(:create_time) + time_lag = create_time && ((Time.now - create_time) * 1000).to_i tags = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), @@ -136,10 +138,15 @@ else timing("consumer.process_message.latency", event.duration, tags: tags) increment("consumer.messages", tags: tags) end - gauge("consumer.lag", lag, tags: tags) + gauge("consumer.lag", offset_lag, tags: tags) + + # Not all messages have timestamps. + if time_lag + gauge("consumer.time_lag", time_lag, tags: tags) + end end def process_batch(event) lag = event.payload.fetch(:offset_lag) messages = event.payload.fetch(:message_count)