lib/kafka/datadog.rb in ruby-kafka-0.4.2 vs lib/kafka/datadog.rb in ruby-kafka-0.4.3

- old
+ new

@@ -25,28 +25,57 @@ # Once the file has been required, no further configuration is needed – all operational # metrics are automatically emitted. module Datadog STATSD_NAMESPACE = "ruby_kafka" - def self.statsd - @statsd ||= ::Datadog::Statsd.new(::Datadog::Statsd::DEFAULT_HOST, ::Datadog::Statsd::DEFAULT_PORT, namespace: STATSD_NAMESPACE) - end + class << self + def statsd + @statsd ||= ::Datadog::Statsd.new(host, port, namespace: namespace, tags: tags) + end - def self.host=(host) - statsd.host = host - end + def host + @host ||= ::Datadog::Statsd::DEFAULT_HOST + end - def self.port=(port) - statsd.port = port - end + def host=(host) + @host = host + clear + end - def self.namespace=(namespace) - statsd.namespace = namespace - end + def port + @port ||= ::Datadog::Statsd::DEFAULT_PORT + end - def self.tags=(tags) - statsd.tags = tags + def port=(port) + @port = port + clear + end + + def namespace + @namespace ||= STATSD_NAMESPACE + end + + def namespace=(namespace) + @namespace = namespace + clear + end + + def tags + @tags ||= [] + end + + def tags=(tags) + @tags = tags + clear + end + + private + + def clear + @statsd && @statsd.close + @statsd = nil + end end class StatsdSubscriber < ActiveSupport::Subscriber private @@ -111,10 +140,11 @@ gauge("consumer.lag", lag, tags: tags) end def process_batch(event) + lag = event.payload.fetch(:offset_lag) messages = event.payload.fetch(:message_count) tags = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), @@ -126,9 +156,11 @@ increment("consumer.process_batch.errors", tags: tags) else timing("consumer.process_batch.latency", event.duration, tags: tags) count("consumer.messages", messages, tags: tags) end + + gauge("consumer.lag", lag, tags: tags) end def join_group(event) tags = { client: event.payload.fetch(:client_id),