lib/kafka/datadog.rb in ruby-kafka-0.5.2.beta3 vs lib/kafka/datadog.rb in ruby-kafka-0.5.2
- old
+ new
@@ -120,10 +120,11 @@
attach_to "connection.kafka"
end
class ConsumerSubscriber < StatsdSubscriber
def process_message(event)
+ offset = event.payload.fetch(:offset)
offset_lag = event.payload.fetch(:offset_lag)
create_time = event.payload.fetch(:create_time)
time_lag = create_time && ((Time.now - create_time) * 1000).to_i
tags = {
@@ -138,19 +139,21 @@
else
timing("consumer.process_message.latency", event.duration, tags: tags)
increment("consumer.messages", tags: tags)
end
+ gauge("consumer.offset", offset, tags: tags)
gauge("consumer.lag", offset_lag, tags: tags)
# Not all messages have timestamps.
if time_lag
gauge("consumer.time_lag", time_lag, tags: tags)
end
end
def process_batch(event)
+ offset = event.payload.fetch(:last_offset)
lag = event.payload.fetch(:offset_lag)
messages = event.payload.fetch(:message_count)
tags = {
client: event.payload.fetch(:client_id),
@@ -164,9 +167,10 @@
else
timing("consumer.process_batch.latency", event.duration, tags: tags)
count("consumer.messages", messages, tags: tags)
end
+ gauge("consumer.offset", offset, tags: tags)
gauge("consumer.lag", lag, tags: tags)
end
def join_group(event)
tags = {