lib/karafka/web/processing/consumers/aggregators/metrics.rb in karafka-web-0.8.0 vs lib/karafka/web/processing/consumers/aggregators/metrics.rb in karafka-web-0.8.1
- old
+ new
@@ -9,11 +9,11 @@
# state that can then be used to enrich previous time based states to get a time-series
# values for charts and metrics
class Metrics < Base
# Current schema version
# This is used for detecting incompatible changes and writing migrations
- SCHEMA_VERSION = '1.1.2'
+ SCHEMA_VERSION = '1.2.1'
def initialize
super
@aggregated_tracker = TimeSeriesTracker.new(metrics.fetch(:aggregated))
@consumer_groups_tracker = TimeSeriesTracker.new(metrics.fetch(:consumer_groups))
@@ -90,16 +90,15 @@
# increase needed storage.
def materialize_consumers_groups_current_state
cgs = {}
iterate_partitions_data do |group_name, topic_name, partitions_data|
- lags = partitions_data
- .map { |p_details| p_details.fetch(:lag, -1) }
- .reject(&:negative?)
-
- lags_stored = partitions_data
- .map { |p_details| p_details.fetch(:lag_stored, -1) }
+ lags_hybrid = partitions_data
+ .map do |p_details|
+ lag_stored = p_details.fetch(:lag_stored, -1)
+ lag_stored.negative? ? p_details.fetch(:lag, -1) : lag_stored
+ end
.reject(&:negative?)
offsets_hi = partitions_data
.map { |p_details| p_details.fetch(:hi_offset, -1) }
.reject(&:negative?)
@@ -119,11 +118,10 @@
ls_offset_fd.negative? ? nil : ls_offset_fd
end
cgs[group_name] ||= {}
cgs[group_name][topic_name] = {
- lag_stored: lags_stored.sum,
- lag: lags.sum,
+ lag_hybrid: lags_hybrid.sum,
pace: offsets_hi.sum,
# Take max last stable offset duration without any change. This can
# indicate a hanging transaction, because the offset will not move forward
# and will stay with a growing freeze duration when stuck
ls_offset_fd: ls_offsets_fds.compact.max || 0