lib/kafka/datadog.rb in ruby-kafka-0.4.2 vs lib/kafka/datadog.rb in ruby-kafka-0.4.3
- old
+ new
@@ -25,28 +25,57 @@
# Once the file has been required, no further configuration is needed – all operational
# metrics are automatically emitted.
module Datadog
STATSD_NAMESPACE = "ruby_kafka"
- def self.statsd
- @statsd ||= ::Datadog::Statsd.new(::Datadog::Statsd::DEFAULT_HOST, ::Datadog::Statsd::DEFAULT_PORT, namespace: STATSD_NAMESPACE)
- end
+ class << self
+ def statsd
+ @statsd ||= ::Datadog::Statsd.new(host, port, namespace: namespace, tags: tags)
+ end
- def self.host=(host)
- statsd.host = host
- end
+ def host
+ @host ||= ::Datadog::Statsd::DEFAULT_HOST
+ end
- def self.port=(port)
- statsd.port = port
- end
+ def host=(host)
+ @host = host
+ clear
+ end
- def self.namespace=(namespace)
- statsd.namespace = namespace
- end
+ def port
+ @port ||= ::Datadog::Statsd::DEFAULT_PORT
+ end
- def self.tags=(tags)
- statsd.tags = tags
+ def port=(port)
+ @port = port
+ clear
+ end
+
+ def namespace
+ @namespace ||= STATSD_NAMESPACE
+ end
+
+ def namespace=(namespace)
+ @namespace = namespace
+ clear
+ end
+
+ def tags
+ @tags ||= []
+ end
+
+ def tags=(tags)
+ @tags = tags
+ clear
+ end
+
+ private
+
+ def clear
+ @statsd && @statsd.close
+ @statsd = nil
+ end
end
class StatsdSubscriber < ActiveSupport::Subscriber
private
@@ -111,10 +140,11 @@
gauge("consumer.lag", lag, tags: tags)
end
def process_batch(event)
+ lag = event.payload.fetch(:offset_lag)
messages = event.payload.fetch(:message_count)
tags = {
client: event.payload.fetch(:client_id),
group_id: event.payload.fetch(:group_id),
@@ -126,9 +156,11 @@
increment("consumer.process_batch.errors", tags: tags)
else
timing("consumer.process_batch.latency", event.duration, tags: tags)
count("consumer.messages", messages, tags: tags)
end
+
+ gauge("consumer.lag", lag, tags: tags)
end
def join_group(event)
tags = {
client: event.payload.fetch(:client_id),