lib/sidekiq/metrics/query.rb in sidekiq-6.5.4 vs lib/sidekiq/metrics/query.rb in sidekiq-6.5.5
- old
+ new
@@ -11,114 +11,143 @@
# Redis and return a Hash of results.
#
# NB: all metrics and times/dates are UTC only. We specifically do not
# support timezones.
class Query
- # :hour, :day, :month
- attr_accessor :period
-
- # a specific job class, e.g. "App::OrderJob"
- attr_accessor :klass
-
- # the date specific to the period
- # for :day or :hour, something like Date.today or Date.new(2022, 7, 13)
- # for :month, Date.new(2022, 7, 1)
- attr_accessor :date
-
- # for period = :hour, the specific hour, integer e.g. 1 or 18
- # note that hours and minutes do not have a leading zero so minute-specific
- # keys will look like "j|20220718|7:3" for data at 07:03.
- attr_accessor :hour
-
def initialize(pool: Sidekiq.redis_pool, now: Time.now)
@time = now.utc
@pool = pool
@klass = nil
end
- # Get metric data from the last hour and roll it up
- # into top processed count and execution time based on class.
- def top_jobs
- resultset = {}
- resultset[:date] = @time.to_date
- resultset[:period] = :hour
- resultset[:ends_at] = @time
- time = @time
+ # Get metric data for all jobs from the last hour
+ def top_jobs(minutes: 60)
+ result = Result.new
- results = @pool.with do |conn|
+ time = @time
+ redis_results = @pool.with do |conn|
conn.pipelined do |pipe|
- resultset[:size] = 60
- 60.times do |idx|
+ minutes.times do |idx|
key = "j|#{time.strftime("%Y%m%d")}|#{time.hour}:#{time.min}"
pipe.hgetall key
+ result.prepend_bucket time
time -= 60
end
- resultset[:starts_at] = time
end
end
- t = Hash.new(0)
- klsset = Set.new
- # merge the per-minute data into a totals hash for the hour
- results.each do |hash|
- hash.each { |k, v| t[k] = t[k] + v.to_i }
- klsset.merge(hash.keys.map { |k| k.split("|")[0] })
+ time = @time
+ redis_results.each do |hash|
+ hash.each do |k, v|
+ kls, metric = k.split("|")
+ result.job_results[kls].add_metric metric, time, v.to_i
+ end
+ time -= 60
end
- resultset[:job_classes] = klsset.delete_if { |item| item.size < 3 }
- resultset[:totals] = t
- top = t.each_with_object({}) do |(k, v), memo|
- (kls, metric) = k.split("|")
- memo[metric] ||= Hash.new(0)
- memo[metric][kls] = v
- end
- sorted = {}
- top.each_pair do |metric, hash|
- sorted[metric] = hash.sort_by { |k, v| v }.reverse.to_h
- end
- resultset[:top_classes] = sorted
- resultset
+ result.marks = fetch_marks(result.starts_at..result.ends_at)
+
+ result
end
- def for_job(klass)
- resultset = {}
- resultset[:date] = @time.to_date
- resultset[:period] = :hour
- resultset[:ends_at] = @time
- marks = @pool.with { |c| c.hgetall("#{@time.strftime("%Y%m%d")}-marks") }
+ def for_job(klass, minutes: 60)
+ result = Result.new
time = @time
- initial = @pool.with do |conn|
+ redis_results = @pool.with do |conn|
conn.pipelined do |pipe|
- resultset[:size] = 60
- 60.times do |idx|
- key = "j|#{time.strftime("%Y%m%d|%-H:%-M")}"
+ minutes.times do |idx|
+ key = "j|#{time.strftime("%Y%m%d")}|#{time.hour}:#{time.min}"
pipe.hmget key, "#{klass}|ms", "#{klass}|p", "#{klass}|f"
+ result.prepend_bucket time
time -= 60
end
end
end
time = @time
- hist = Histogram.new(klass)
- results = @pool.with do |conn|
- initial.map do |(ms, p, f)|
- tm = Time.utc(time.year, time.month, time.mday, time.hour, time.min, 0)
- {
- time: tm.iso8601,
- epoch: tm.to_i,
- ms: ms.to_i, p: p.to_i, f: f.to_i, hist: hist.fetch(conn, time)
- }.tap { |x|
- x[:mark] = marks[x[:time]] if marks[x[:time]]
- time -= 60
- }
+ @pool.with do |conn|
+ redis_results.each do |(ms, p, f)|
+ result.job_results[klass].add_metric "ms", time, ms.to_i if ms
+ result.job_results[klass].add_metric "p", time, p.to_i if p
+ result.job_results[klass].add_metric "f", time, f.to_i if f
+ result.job_results[klass].add_hist time, Histogram.new(klass).fetch(conn, time)
+ time -= 60
end
end
- resultset[:marks] = marks
- resultset[:starts_at] = time
- resultset[:data] = results
- resultset
+ result.marks = fetch_marks(result.starts_at..result.ends_at)
+
+ result
+ end
+
+ class Result < Struct.new(:starts_at, :ends_at, :size, :buckets, :job_results, :marks)
+ def initialize
+ super
+ self.buckets = []
+ self.marks = []
+ self.job_results = Hash.new { |h, k| h[k] = JobResult.new }
+ end
+
+ def prepend_bucket(time)
+ buckets.unshift time.strftime("%H:%M")
+ self.ends_at ||= time
+ self.starts_at = time
+ end
+ end
+
+ class JobResult < Struct.new(:series, :hist, :totals)
+ def initialize
+ super
+ self.series = Hash.new { |h, k| h[k] = Hash.new(0) }
+ self.hist = Hash.new { |h, k| h[k] = [] }
+ self.totals = Hash.new(0)
+ end
+
+ def add_metric(metric, time, value)
+ totals[metric] += value
+ series[metric][time.strftime("%H:%M")] += value
+
+ # Include timing measurements in seconds for convenience
+ add_metric("s", time, value / 1000.0) if metric == "ms"
+ end
+
+ def add_hist(time, hist_result)
+ hist[time.strftime("%H:%M")] = hist_result
+ end
+
+ def total_avg(metric = "ms")
+ completed = totals["p"] - totals["f"]
+ totals[metric].to_f / completed
+ end
+
+ def series_avg(metric = "ms")
+ series[metric].each_with_object(Hash.new(0)) do |(bucket, value), result|
+ completed = series.dig("p", bucket) - series.dig("f", bucket)
+ result[bucket] = completed == 0 ? 0 : value.to_f / completed
+ end
+ end
+ end
+
+ class MarkResult < Struct.new(:time, :label)
+ def bucket
+ time.strftime("%H:%M")
+ end
+ end
+
+ private
+
+ def fetch_marks(time_range)
+ [].tap do |result|
+ marks = @pool.with { |c| c.hgetall("#{@time.strftime("%Y%m%d")}-marks") }
+
+ marks.each do |timestamp, label|
+ time = Time.parse(timestamp)
+ if time_range.cover? time
+ result << MarkResult.new(time, label)
+ end
+ end
+ end
end
end
end
end