lib/sidekiq/metrics/query.rb in sidekiq-6.5.4 vs lib/sidekiq/metrics/query.rb in sidekiq-6.5.5

- old
+ new

@@ -11,114 +11,143 @@ # Redis and return a Hash of results. # # NB: all metrics and times/dates are UTC only. We specifically do not # support timezones. class Query - # :hour, :day, :month - attr_accessor :period - - # a specific job class, e.g. "App::OrderJob" - attr_accessor :klass - - # the date specific to the period - # for :day or :hour, something like Date.today or Date.new(2022, 7, 13) - # for :month, Date.new(2022, 7, 1) - attr_accessor :date - - # for period = :hour, the specific hour, integer e.g. 1 or 18 - # note that hours and minutes do not have a leading zero so minute-specific - # keys will look like "j|20220718|7:3" for data at 07:03. - attr_accessor :hour - def initialize(pool: Sidekiq.redis_pool, now: Time.now) @time = now.utc @pool = pool @klass = nil end - # Get metric data from the last hour and roll it up - # into top processed count and execution time based on class. - def top_jobs - resultset = {} - resultset[:date] = @time.to_date - resultset[:period] = :hour - resultset[:ends_at] = @time - time = @time + # Get metric data for all jobs from the last hour + def top_jobs(minutes: 60) + result = Result.new - results = @pool.with do |conn| + time = @time + redis_results = @pool.with do |conn| conn.pipelined do |pipe| - resultset[:size] = 60 - 60.times do |idx| + minutes.times do |idx| key = "j|#{time.strftime("%Y%m%d")}|#{time.hour}:#{time.min}" pipe.hgetall key + result.prepend_bucket time time -= 60 end - resultset[:starts_at] = time end end - t = Hash.new(0) - klsset = Set.new - # merge the per-minute data into a totals hash for the hour - results.each do |hash| - hash.each { |k, v| t[k] = t[k] + v.to_i } - klsset.merge(hash.keys.map { |k| k.split("|")[0] }) + time = @time + redis_results.each do |hash| + hash.each do |k, v| + kls, metric = k.split("|") + result.job_results[kls].add_metric metric, time, v.to_i + end + time -= 60 end - resultset[:job_classes] = klsset.delete_if { |item| item.size < 3 } - resultset[:totals] = t - top = t.each_with_object({}) do |(k, v), memo| - (kls, metric) = k.split("|") - memo[metric] ||= Hash.new(0) - memo[metric][kls] = v - end - sorted = {} - top.each_pair do |metric, hash| - sorted[metric] = hash.sort_by { |k, v| v }.reverse.to_h - end - resultset[:top_classes] = sorted - resultset + result.marks = fetch_marks(result.starts_at..result.ends_at) + + result end - def for_job(klass) - resultset = {} - resultset[:date] = @time.to_date - resultset[:period] = :hour - resultset[:ends_at] = @time - marks = @pool.with { |c| c.hgetall("#{@time.strftime("%Y%m%d")}-marks") } + def for_job(klass, minutes: 60) + result = Result.new time = @time - initial = @pool.with do |conn| + redis_results = @pool.with do |conn| conn.pipelined do |pipe| - resultset[:size] = 60 - 60.times do |idx| - key = "j|#{time.strftime("%Y%m%d|%-H:%-M")}" + minutes.times do |idx| + key = "j|#{time.strftime("%Y%m%d")}|#{time.hour}:#{time.min}" pipe.hmget key, "#{klass}|ms", "#{klass}|p", "#{klass}|f" + result.prepend_bucket time time -= 60 end end end time = @time - hist = Histogram.new(klass) - results = @pool.with do |conn| - initial.map do |(ms, p, f)| - tm = Time.utc(time.year, time.month, time.mday, time.hour, time.min, 0) - { - time: tm.iso8601, - epoch: tm.to_i, - ms: ms.to_i, p: p.to_i, f: f.to_i, hist: hist.fetch(conn, time) - }.tap { |x| - x[:mark] = marks[x[:time]] if marks[x[:time]] - time -= 60 - } + @pool.with do |conn| + redis_results.each do |(ms, p, f)| + result.job_results[klass].add_metric "ms", time, ms.to_i if ms + result.job_results[klass].add_metric "p", time, p.to_i if p + result.job_results[klass].add_metric "f", time, f.to_i if f + result.job_results[klass].add_hist time, Histogram.new(klass).fetch(conn, time) + time -= 60 end end - resultset[:marks] = marks - resultset[:starts_at] = time - resultset[:data] = results - resultset + result.marks = fetch_marks(result.starts_at..result.ends_at) + + result + end + + class Result < Struct.new(:starts_at, :ends_at, :size, :buckets, :job_results, :marks) + def initialize + super + self.buckets = [] + self.marks = [] + self.job_results = Hash.new { |h, k| h[k] = JobResult.new } + end + + def prepend_bucket(time) + buckets.unshift time.strftime("%H:%M") + self.ends_at ||= time + self.starts_at = time + end + end + + class JobResult < Struct.new(:series, :hist, :totals) + def initialize + super + self.series = Hash.new { |h, k| h[k] = Hash.new(0) } + self.hist = Hash.new { |h, k| h[k] = [] } + self.totals = Hash.new(0) + end + + def add_metric(metric, time, value) + totals[metric] += value + series[metric][time.strftime("%H:%M")] += value + + # Include timing measurements in seconds for convenience + add_metric("s", time, value / 1000.0) if metric == "ms" + end + + def add_hist(time, hist_result) + hist[time.strftime("%H:%M")] = hist_result + end + + def total_avg(metric = "ms") + completed = totals["p"] - totals["f"] + totals[metric].to_f / completed + end + + def series_avg(metric = "ms") + series[metric].each_with_object(Hash.new(0)) do |(bucket, value), result| + completed = series.dig("p", bucket) - series.dig("f", bucket) + result[bucket] = completed == 0 ? 0 : value.to_f / completed + end + end + end + + class MarkResult < Struct.new(:time, :label) + def bucket + time.strftime("%H:%M") + end + end + + private + + def fetch_marks(time_range) + [].tap do |result| + marks = @pool.with { |c| c.hgetall("#{@time.strftime("%Y%m%d")}-marks") } + + marks.each do |timestamp, label| + time = Time.parse(timestamp) + if time_range.cover? time + result << MarkResult.new(time, label) + end + end + end end end end end