lib/karafka/web/processing/consumers/aggregators/metrics.rb in karafka-web-0.8.0 vs lib/karafka/web/processing/consumers/aggregators/metrics.rb in karafka-web-0.8.1

- old
+ new

@@ -9,11 +9,11 @@ # state that can then be used to enrich previous time based states to get a time-series # values for charts and metrics class Metrics < Base # Current schema version # This is used for detecting incompatible changes and writing migrations - SCHEMA_VERSION = '1.1.2' + SCHEMA_VERSION = '1.2.1' def initialize super @aggregated_tracker = TimeSeriesTracker.new(metrics.fetch(:aggregated)) @consumer_groups_tracker = TimeSeriesTracker.new(metrics.fetch(:consumer_groups)) @@ -90,16 +90,15 @@ # increase needed storage. def materialize_consumers_groups_current_state cgs = {} iterate_partitions_data do |group_name, topic_name, partitions_data| - lags = partitions_data - .map { |p_details| p_details.fetch(:lag, -1) } - .reject(&:negative?) - - lags_stored = partitions_data - .map { |p_details| p_details.fetch(:lag_stored, -1) } + lags_hybrid = partitions_data + .map do |p_details| + lag_stored = p_details.fetch(:lag_stored, -1) + lag_stored.negative? ? p_details.fetch(:lag, -1) : lag_stored + end .reject(&:negative?) offsets_hi = partitions_data .map { |p_details| p_details.fetch(:hi_offset, -1) } .reject(&:negative?) @@ -119,11 +118,10 @@ ls_offset_fd.negative? ? nil : ls_offset_fd end cgs[group_name] ||= {} cgs[group_name][topic_name] = { - lag_stored: lags_stored.sum, - lag: lags.sum, + lag_hybrid: lags_hybrid.sum, pace: offsets_hi.sum, # Take max last stable offset duration without any change. This can # indicate a hanging transaction, because the offset will not move forward # and will stay with a growing freeze duration when stuck ls_offset_fd: ls_offsets_fds.compact.max || 0