lib/metrics.rb in streamdal-0.0.1 vs lib/metrics.rb in streamdal-0.0.2
- old
+ new
@@ -1,22 +1,24 @@
+# frozen_string_literal: true
+
module Streamdal
class Counter
attr_accessor :last_updated, :name, :aud, :labels
def initialize(name, aud, labels = {}, value = 0.0)
@name = name
@aud = aud
@labels = labels
- @value = 0
- @last_updated = Time::now
+ @value = value
+ @last_updated = Time.now
@value_mtx = Mutex.new
end
def incr(val)
@value_mtx.synchronize do
- @value = @value + val
- @last_updated = Time::now
+ @value += val
+ @last_updated = Time.now
end
end
def reset
@value_mtx.synchronize do
@@ -31,34 +33,32 @@
end
end
class Metrics
- COUNTER_CONSUME_BYTES = "counter_consume_bytes"
- COUNTER_CONSUME_PROCESSED = "counter_consume_processed"
- COUNTER_CONSUME_ERRORS = "counter_consume_errors"
- COUNTER_PRODUCE_BYTES = "counter_produce_bytes"
- COUNTER_PRODUCE_PROCESSED = "counter_produce_processed"
- COUNTER_PRODUCE_ERRORS = "counter_produce_errors"
- COUNTER_NOTIFY = "counter_notify"
- COUNTER_DROPPED_TAIL_MESSAGES = "counter_dropped_tail_messages"
- COUNTER_CONSUME_BYTES_RATE = "counter_consume_bytes_rate"
- COUNTER_PRODUCE_BYTES_RATE = "counter_produce_bytes_rate"
- COUNTER_CONSUME_PROCESSED_RATE = "counter_consume_processed_rate"
- COUNTER_PRODUCE_PROCESSED_RATE = "counter_produce_processed_rate"
+ COUNTER_CONSUME_BYTES = 'counter_consume_bytes'
+ COUNTER_CONSUME_PROCESSED = 'counter_consume_processed'
+ COUNTER_CONSUME_ERRORS = 'counter_consume_errors'
+ COUNTER_PRODUCE_BYTES = 'counter_produce_bytes'
+ COUNTER_PRODUCE_PROCESSED = 'counter_produce_processed'
+ COUNTER_PRODUCE_ERRORS = 'counter_produce_errors'
+ COUNTER_NOTIFY = 'counter_notify'
+ COUNTER_DROPPED_TAIL_MESSAGES = 'counter_dropped_tail_messages'
+ COUNTER_CONSUME_BYTES_RATE = 'counter_consume_bytes_rate'
+ COUNTER_PRODUCE_BYTES_RATE = 'counter_produce_bytes_rate'
+ COUNTER_CONSUME_PROCESSED_RATE = 'counter_consume_processed_rate'
+ COUNTER_PRODUCE_PROCESSED_RATE = 'counter_produce_processed_rate'
WORKER_POOL_SIZE = 3
DEFAULT_COUNTER_REAPER_INTERVAL = 10
DEFAULT_COUNTER_TTL = 10
DEFAULT_COUNTER_PUBLISH_INTERVAL = 1
CounterEntry = Struct.new(:name, :aud, :labels, :value)
def initialize(cfg)
- if cfg.nil?
- raise ArgumentError, "cfg is nil"
- end
+ raise ArgumentError, 'cfg is nil' if cfg.nil?
@cfg = cfg
@log = cfg[:log]
@counters = {}
@counters_mtx = Mutex.new
@@ -78,45 +78,37 @@
# Let loops exit
sleep(1)
# Exit any remaining threads
@workers.each do |w|
- if w.running?
- w.exit
- end
+ w.exit if w.running?
end
end
def self.composite_id(counter_name, labels = {})
- if labels.nil?
- labels = {}
- end
- "#{counter_name}-#{labels.values.join("-")}".freeze
+ labels = {} if labels.nil?
+ "#{counter_name}-#{labels.values.join('-')}"
end
def get_counter(ce)
- if ce.nil?
- raise ArgumentError, "ce is nil"
- end
+ raise ArgumentError, 'ce is nil' if ce.nil?
- k = Metrics::composite_id(ce.name, ce.labels)
+ k = Metrics.composite_id(ce.name, ce.labels)
@counters_mtx.synchronize do
- if @counters.key?(k)
- @counters[k]
- end
+ @counters[k] if @counters.key?(k)
end
# No counter exists, create a new one and return it
new_counter(ce)
end
def new_counter(ce)
c = Counter.new(ce.name, ce.aud, ce.labels, ce.value)
@counters_mtx.synchronize do
- @counters[Metrics::composite_id(ce.name, ce.labels)] = c
+ @counters[Metrics.composite_id(ce.name, ce.labels)] = c
end
c
end
@@ -164,46 +156,43 @@
end
def _run_publisher
# Background thread that reads values from counters, adds them to the publish queue, and then
# resets the counter's value back to zero
- unless @exit
- @log.debug("Starting publisher")
+ return if @exit
- # Sleep on startup and then and between each loop run
- sleep(DEFAULT_COUNTER_PUBLISH_INTERVAL)
+ @log.debug('Starting publisher')
- # Get all counters
- # Loop over each counter, get the value,
- # if value > 0, continue
- # if now() - last_updated > 10 seconds, remove counter
- # Grab copy of counters
- @counters_mtx.lock
- new_counters = @counters.dup
- @counters_mtx.unlock
+ # Sleep on startup and then and between each loop run
+ sleep(DEFAULT_COUNTER_PUBLISH_INTERVAL)
- new_counters.each do |_, counter|
- if counter.val == 0
- next
- end
+ # Get all counters
+ # Loop over each counter, get the value,
+ # if value > 0, continue
+ # if now() - last_updated > 10 seconds, remove counter
+ # Grab copy of counters
+ @counters_mtx.lock
+ new_counters = @counters.dup
+ @counters_mtx.unlock
- ce = CounterEntry.new(counter.name, counter.aud, counter.labels, counter.val)
- counter.reset
+ new_counters.each_value do |counter|
+ next if counter.val.zero?
- @publish_queue.push(ce)
- end
+ ce = CounterEntry.new(counter.name, counter.aud, counter.labels, counter.val)
+ counter.reset
+
+ @publish_queue.push(ce)
end
end
def _run_publisher_worker(worker_id)
@log.debug("Starting publisher worker '#{worker_id}'")
until @exit
ce = @incr_queue.pop
- if ce.nil?
- next
- end
+ next if ce.nil?
+
begin
_publish_metrics(ce)
rescue => e
@log.error("Failed to publish metrics: #{e}: #{ce.inspect}")
end
@@ -211,11 +200,11 @@
@log.debug("Exiting publisher worker '#{worker_id}'")
end
def _run_reaper
- @log.debug("Starting reaper")
+ @log.debug('Starting reaper')
until @exit
# Sleep on startup and then and between each loop run
sleep(DEFAULT_COUNTER_REAPER_INTERVAL)
@@ -224,23 +213,21 @@
# if value > 0, continue
# if now() - last_updated > 10 seconds, remove counter
# Grab copy of counters
@counters_mtx.synchronize do
@counters.each do |name, counter|
- if counter.val > 0
- next
- end
+ next if counter.val.positive?
- if Time::now - counter.last_updated > DEFAULT_COUNTER_TTL
+ if Time.now - counter.last_updated > DEFAULT_COUNTER_TTL
@log.debug("Reaping counter '#{name}'")
@counters.delete(name)
end
end
end
end
- @log.debug("Exiting reaper")
+ @log.debug('Exiting reaper')
end
def _run_incrementer_worker(worker_id)
@log.debug("Starting incrementer worker '#{worker_id}'")
@@ -257,9 +244,9 @@
@log.debug("Exiting incrementer worker '#{worker_id}'")
end
# Returns metadata for gRPC requests to the internal gRPC API
def _metadata
- { "auth-token" => @cfg[:streamdal_token].to_s }
+ { 'auth-token' => @cfg[:streamdal_token].to_s }
end
end
-end
\ No newline at end of file
+end