lib/metrics.rb in streamdal-0.0.1 vs lib/metrics.rb in streamdal-0.0.2

- old
+ new

@@ -1,22 +1,24 @@ +# frozen_string_literal: true + module Streamdal class Counter attr_accessor :last_updated, :name, :aud, :labels def initialize(name, aud, labels = {}, value = 0.0) @name = name @aud = aud @labels = labels - @value = 0 - @last_updated = Time::now + @value = value + @last_updated = Time.now @value_mtx = Mutex.new end def incr(val) @value_mtx.synchronize do - @value = @value + val - @last_updated = Time::now + @value += val + @last_updated = Time.now end end def reset @value_mtx.synchronize do @@ -31,34 +33,32 @@ end end class Metrics - COUNTER_CONSUME_BYTES = "counter_consume_bytes" - COUNTER_CONSUME_PROCESSED = "counter_consume_processed" - COUNTER_CONSUME_ERRORS = "counter_consume_errors" - COUNTER_PRODUCE_BYTES = "counter_produce_bytes" - COUNTER_PRODUCE_PROCESSED = "counter_produce_processed" - COUNTER_PRODUCE_ERRORS = "counter_produce_errors" - COUNTER_NOTIFY = "counter_notify" - COUNTER_DROPPED_TAIL_MESSAGES = "counter_dropped_tail_messages" - COUNTER_CONSUME_BYTES_RATE = "counter_consume_bytes_rate" - COUNTER_PRODUCE_BYTES_RATE = "counter_produce_bytes_rate" - COUNTER_CONSUME_PROCESSED_RATE = "counter_consume_processed_rate" - COUNTER_PRODUCE_PROCESSED_RATE = "counter_produce_processed_rate" + COUNTER_CONSUME_BYTES = 'counter_consume_bytes' + COUNTER_CONSUME_PROCESSED = 'counter_consume_processed' + COUNTER_CONSUME_ERRORS = 'counter_consume_errors' + COUNTER_PRODUCE_BYTES = 'counter_produce_bytes' + COUNTER_PRODUCE_PROCESSED = 'counter_produce_processed' + COUNTER_PRODUCE_ERRORS = 'counter_produce_errors' + COUNTER_NOTIFY = 'counter_notify' + COUNTER_DROPPED_TAIL_MESSAGES = 'counter_dropped_tail_messages' + COUNTER_CONSUME_BYTES_RATE = 'counter_consume_bytes_rate' + COUNTER_PRODUCE_BYTES_RATE = 'counter_produce_bytes_rate' + COUNTER_CONSUME_PROCESSED_RATE = 'counter_consume_processed_rate' + COUNTER_PRODUCE_PROCESSED_RATE = 'counter_produce_processed_rate' WORKER_POOL_SIZE = 3 DEFAULT_COUNTER_REAPER_INTERVAL = 10 DEFAULT_COUNTER_TTL = 10 DEFAULT_COUNTER_PUBLISH_INTERVAL = 1 CounterEntry = Struct.new(:name, :aud, :labels, :value) def initialize(cfg) - if cfg.nil? - raise ArgumentError, "cfg is nil" - end + raise ArgumentError, 'cfg is nil' if cfg.nil? @cfg = cfg @log = cfg[:log] @counters = {} @counters_mtx = Mutex.new @@ -78,45 +78,37 @@ # Let loops exit sleep(1) # Exit any remaining threads @workers.each do |w| - if w.running? - w.exit - end + w.exit if w.running? end end def self.composite_id(counter_name, labels = {}) - if labels.nil? - labels = {} - end - "#{counter_name}-#{labels.values.join("-")}".freeze + labels = {} if labels.nil? + "#{counter_name}-#{labels.values.join('-')}" end def get_counter(ce) - if ce.nil? - raise ArgumentError, "ce is nil" - end + raise ArgumentError, 'ce is nil' if ce.nil? - k = Metrics::composite_id(ce.name, ce.labels) + k = Metrics.composite_id(ce.name, ce.labels) @counters_mtx.synchronize do - if @counters.key?(k) - @counters[k] - end + @counters[k] if @counters.key?(k) end # No counter exists, create a new one and return it new_counter(ce) end def new_counter(ce) c = Counter.new(ce.name, ce.aud, ce.labels, ce.value) @counters_mtx.synchronize do - @counters[Metrics::composite_id(ce.name, ce.labels)] = c + @counters[Metrics.composite_id(ce.name, ce.labels)] = c end c end @@ -164,46 +156,43 @@ end def _run_publisher # Background thread that reads values from counters, adds them to the publish queue, and then # resets the counter's value back to zero - unless @exit - @log.debug("Starting publisher") + return if @exit - # Sleep on startup and then and between each loop run - sleep(DEFAULT_COUNTER_PUBLISH_INTERVAL) + @log.debug('Starting publisher') - # Get all counters - # Loop over each counter, get the value, - # if value > 0, continue - # if now() - last_updated > 10 seconds, remove counter - # Grab copy of counters - @counters_mtx.lock - new_counters = @counters.dup - @counters_mtx.unlock + # Sleep on startup and then and between each loop run + sleep(DEFAULT_COUNTER_PUBLISH_INTERVAL) - new_counters.each do |_, counter| - if counter.val == 0 - next - end + # Get all counters + # Loop over each counter, get the value, + # if value > 0, continue + # if now() - last_updated > 10 seconds, remove counter + # Grab copy of counters + @counters_mtx.lock + new_counters = @counters.dup + @counters_mtx.unlock - ce = CounterEntry.new(counter.name, counter.aud, counter.labels, counter.val) - counter.reset + new_counters.each_value do |counter| + next if counter.val.zero? - @publish_queue.push(ce) - end + ce = CounterEntry.new(counter.name, counter.aud, counter.labels, counter.val) + counter.reset + + @publish_queue.push(ce) end end def _run_publisher_worker(worker_id) @log.debug("Starting publisher worker '#{worker_id}'") until @exit ce = @incr_queue.pop - if ce.nil? - next - end + next if ce.nil? + begin _publish_metrics(ce) rescue => e @log.error("Failed to publish metrics: #{e}: #{ce.inspect}") end @@ -211,11 +200,11 @@ @log.debug("Exiting publisher worker '#{worker_id}'") end def _run_reaper - @log.debug("Starting reaper") + @log.debug('Starting reaper') until @exit # Sleep on startup and then and between each loop run sleep(DEFAULT_COUNTER_REAPER_INTERVAL) @@ -224,23 +213,21 @@ # if value > 0, continue # if now() - last_updated > 10 seconds, remove counter # Grab copy of counters @counters_mtx.synchronize do @counters.each do |name, counter| - if counter.val > 0 - next - end + next if counter.val.positive? - if Time::now - counter.last_updated > DEFAULT_COUNTER_TTL + if Time.now - counter.last_updated > DEFAULT_COUNTER_TTL @log.debug("Reaping counter '#{name}'") @counters.delete(name) end end end end - @log.debug("Exiting reaper") + @log.debug('Exiting reaper') end def _run_incrementer_worker(worker_id) @log.debug("Starting incrementer worker '#{worker_id}'") @@ -257,9 +244,9 @@ @log.debug("Exiting incrementer worker '#{worker_id}'") end # Returns metadata for gRPC requests to the internal gRPC API def _metadata - { "auth-token" => @cfg[:streamdal_token].to_s } + { 'auth-token' => @cfg[:streamdal_token].to_s } end end -end \ No newline at end of file +end