lib/kafka/datadog.rb in ruby-kafka-0.5.2.beta3 vs lib/kafka/datadog.rb in ruby-kafka-0.5.2

- old
+ new

@@ -120,10 +120,11 @@ attach_to "connection.kafka" end class ConsumerSubscriber < StatsdSubscriber def process_message(event) + offset = event.payload.fetch(:offset) offset_lag = event.payload.fetch(:offset_lag) create_time = event.payload.fetch(:create_time) time_lag = create_time && ((Time.now - create_time) * 1000).to_i tags = { @@ -138,19 +139,21 @@ else timing("consumer.process_message.latency", event.duration, tags: tags) increment("consumer.messages", tags: tags) end + gauge("consumer.offset", offset, tags: tags) gauge("consumer.lag", offset_lag, tags: tags) # Not all messages have timestamps. if time_lag gauge("consumer.time_lag", time_lag, tags: tags) end end def process_batch(event) + offset = event.payload.fetch(:last_offset) lag = event.payload.fetch(:offset_lag) messages = event.payload.fetch(:message_count) tags = { client: event.payload.fetch(:client_id), @@ -164,9 +167,10 @@ else timing("consumer.process_batch.latency", event.duration, tags: tags) count("consumer.messages", messages, tags: tags) end + gauge("consumer.offset", offset, tags: tags) gauge("consumer.lag", lag, tags: tags) end def join_group(event) tags = {