lib/gitlab_exporter/sidekiq.rb in gitlab-exporter-13.1.0 vs lib/gitlab_exporter/sidekiq.rb in gitlab-exporter-13.2.0

- old
+ new

@@ -5,10 +5,11 @@ module GitLab module Exporter # A prober for Sidekiq queues # # It takes the Redis URL Sidekiq is connected to + # rubocop:disable Metrics/ClassLength class SidekiqProber # The maximum depth (from the head) of each queue to probe. Probing the # entirety of a very large queue will take longer and run the risk of # timing out. But when we have a very large queue, we are most in need of # reliable metrics. This trades off completeness for predictability by @@ -35,39 +36,28 @@ def initialize(metrics: PrometheusMetrics.new, logger: nil, **opts) @opts = opts @metrics = metrics @logger = logger + + @probe_namespaced = !!opts[:probe_namespaced] # rubocop:disable Style/DoubleNegation + @probe_non_namespaced = !!opts[:probe_non_namespaced] # rubocop:disable Style/DoubleNegation + + # to maintain backward compatibility if the config is missing values + @probe_namespaced = true if opts[:probe_namespaced].nil? && opts[:probe_non_namespaced].nil? end def probe_stats - with_sidekiq do - stats = Sidekiq::Stats.new + with_sidekiq { _probe_stats } if @probe_namespaced + with_sidekiq(false) { _probe_stats(false) } if @probe_non_namespaced - @metrics.add("sidekiq_jobs_processed_total", stats.processed) - @metrics.add("sidekiq_jobs_failed_total", stats.failed) - @metrics.add("sidekiq_jobs_enqueued_size", stats.enqueued) - @metrics.add("sidekiq_jobs_scheduled_size", stats.scheduled_size) - @metrics.add("sidekiq_jobs_retry_size", stats.retry_size) - @metrics.add("sidekiq_jobs_dead_size", stats.dead_size) - - @metrics.add("sidekiq_default_queue_latency_seconds", stats.default_queue_latency) - @metrics.add("sidekiq_processes_size", stats.processes_size) - @metrics.add("sidekiq_workers_size", stats.workers_size) - end - self end def probe_queues - with_sidekiq do - Sidekiq::Queue.all.each do |queue| - @metrics.add("sidekiq_queue_size", queue.size, name: queue.name) - @metrics.add("sidekiq_queue_latency_seconds", queue.latency, name: queue.name) - @metrics.add("sidekiq_queue_paused", queue.paused? ? 1 : 0, name: queue.name) - end - end + with_sidekiq { _probe_queues } if @probe_namespaced + with_sidekiq(false) { _probe_queues(false) } if @probe_non_namespaced self end def probe_jobs @@ -76,89 +66,38 @@ self end def probe_future_sets - now = Time.now.to_f - with_sidekiq do - Sidekiq.redis do |conn| - Sidekiq::Scheduled::SETS.each do |set| - # Default to 0; if all jobs are due in the future, there is no "negative" delay. - delay = 0 - - _job, timestamp = conn.zrangebyscore(set, "-inf", now.to_s, limit: [0, 1], withscores: true).first - delay = now - timestamp if timestamp - - @metrics.add("sidekiq_#{set}_set_processing_delay_seconds", delay) - - # zcount is O(log(N)) (prob. binary search), so is still quick even with large sets - @metrics.add("sidekiq_#{set}_set_backlog_count", - conn.zcount(set, "-inf", now.to_s)) - end - end - end + with_sidekiq { _probe_future_sets } if @probe_namespaced + with_sidekiq(false) { _probe_future_sets(false) } if @probe_non_namespaced end # Count worker classes present in Sidekiq queues. This only looks at the # first PROBE_JOBS_LIMIT jobs in each queue. This means that we run a # single LRANGE command for each queue, which does not block other # commands. For queues over PROBE_JOBS_LIMIT in size, this means that we # will not have completely accurate statistics, but the probe performance # will also not degrade as the queue gets larger. def probe_jobs_limit - with_sidekiq do - job_stats = Hash.new(0) + with_sidekiq { _probe_jobs_limit } if @probe_namespaced + with_sidekiq(false) { _probe_jobs_limit(false) } if @probe_non_namespaced - Sidekiq::Queue.all.each do |queue| - Sidekiq.redis do |conn| - conn.lrange("queue:#{queue.name}", 0, PROBE_JOBS_LIMIT).each do |job| - job_class = Sidekiq.load_json(job)["class"] - - job_stats[job_class] += 1 - end - end - end - - job_stats.each do |class_name, count| - @metrics.add("sidekiq_enqueued_jobs", count, name: class_name) - end - end - self end def probe_workers - with_sidekiq do - worker_stats = Hash.new(0) + with_sidekiq { _probe_workers } if @probe_namespaced + with_sidekiq(false) { _probe_workers(false) } if @probe_non_namespaced - Sidekiq::Workers.new.map do |_pid, _tid, work| - job_klass = work["payload"]["class"] - - worker_stats[job_klass] += 1 - end - - worker_stats.each do |class_name, count| - @metrics.add("sidekiq_running_jobs", count, name: class_name) - end - end - self end def probe_retries - with_sidekiq do - retry_stats = Hash.new(0) + with_sidekiq { _probe_retries } if @probe_namespaced + with_sidekiq(false) { _probe_retries(false) } if @probe_non_namespaced - Sidekiq::RetrySet.new.map do |job| - retry_stats[job.klass] += 1 - end - - retry_stats.each do |class_name, count| - @metrics.add("sidekiq_to_be_retried_jobs", count, name: class_name) - end - end - self end def probe_dead puts "[DEPRECATED] probe_dead is now considered obsolete and will be removed in future major versions,"\ @@ -175,31 +114,125 @@ target.write(@metrics.to_s) end private - def with_sidekiq + def _probe_workers(namespace = true) + labels = add_namespace_labels? ? { namespaced: namespace } : {} + worker_stats = Hash.new(0) + + Sidekiq::Workers.new.map do |_pid, _tid, work| + job_klass = work["payload"]["class"] + + worker_stats[job_klass] += 1 + end + + worker_stats.each do |class_name, count| + @metrics.add("sidekiq_running_jobs", count, **labels.merge({ name: class_name })) + end + end + + def _probe_future_sets(namespace = true) + labels = add_namespace_labels? ? { namespaced: namespace } : {} + now = Time.now.to_f + Sidekiq.redis do |conn| + Sidekiq::Scheduled::SETS.each do |set| + # Default to 0; if all jobs are due in the future, there is no "negative" delay. + delay = 0 + + _job, timestamp = conn.zrangebyscore(set, "-inf", now.to_s, limit: [0, 1], withscores: true).first + delay = now - timestamp if timestamp + + @metrics.add("sidekiq_#{set}_set_processing_delay_seconds", delay, **labels) + + # zcount is O(log(N)) (prob. binary search), so is still quick even with large sets + @metrics.add("sidekiq_#{set}_set_backlog_count", + conn.zcount(set, "-inf", now.to_s), **labels) + end + end + end + + def _probe_stats(namespace = true) + stats = Sidekiq::Stats.new + labels = add_namespace_labels? ? { namespaced: namespace } : {} + + @metrics.add("sidekiq_jobs_processed_total", stats.processed, **labels) + @metrics.add("sidekiq_jobs_failed_total", stats.failed, **labels) + @metrics.add("sidekiq_jobs_enqueued_size", stats.enqueued, **labels) + @metrics.add("sidekiq_jobs_scheduled_size", stats.scheduled_size, **labels) + @metrics.add("sidekiq_jobs_retry_size", stats.retry_size, **labels) + @metrics.add("sidekiq_jobs_dead_size", stats.dead_size, **labels) + + @metrics.add("sidekiq_default_queue_latency_seconds", stats.default_queue_latency, **labels) + @metrics.add("sidekiq_processes_size", stats.processes_size, **labels) + @metrics.add("sidekiq_workers_size", stats.workers_size, **labels) + end + + def _probe_queues(namespace = true) + Sidekiq::Queue.all.each do |queue| + labels = { name: queue.name } + labels[:namespaced] = namespace if add_namespace_labels? + + @metrics.add("sidekiq_queue_size", queue.size, **labels) + @metrics.add("sidekiq_queue_latency_seconds", queue.latency, **labels) + @metrics.add("sidekiq_queue_paused", queue.paused? ? 1 : 0, **labels) + end + end + + def _probe_jobs_limit(namespace = true) + labels = add_namespace_labels? ? { namespaced: namespace } : {} + job_stats = Hash.new(0) + + Sidekiq::Queue.all.each do |queue| + Sidekiq.redis do |conn| + conn.lrange("queue:#{queue.name}", 0, PROBE_JOBS_LIMIT).each do |job| + job_class = Sidekiq.load_json(job)["class"] + + job_stats[job_class] += 1 + end + end + end + + job_stats.each do |class_name, count| + @metrics.add("sidekiq_enqueued_jobs", count, **labels.merge({ name: class_name })) + end + end + + def _probe_retries(namespace = true) + labels = add_namespace_labels? ? { namespaced: namespace } : {} + retry_stats = Hash.new(0) + + Sidekiq::RetrySet.new.map do |job| + retry_stats[job.klass] += 1 + end + + retry_stats.each do |class_name, count| + @metrics.add("sidekiq_to_be_retried_jobs", count, **labels.merge({ name: class_name })) + end + end + + def with_sidekiq(namespaced = true) # TODO: this is not concurrent safe as we change global context # It means that we are unable to use many different sidekiq's # which is not a problem as of now Sidekiq.configure_client do |config| - config.redis = self.class.connection_pool[redis_options] + config.redis = self.class.connection_pool[redis_options(namespaced)] end return unless connected? yield end - def redis_options + def redis_options(namespaced = true) options = { url: @opts[:redis_url], - namespace: "resque:gitlab", connect_timeout: 1, reconnect_attempts: 0 } + options[:namespace] = "resque:gitlab" if namespaced options[:id] = nil unless redis_enable_client? options end def redis_enable_client? @@ -216,8 +249,13 @@ end rescue Redis::BaseConnectionError => e @logger&.error "Error connecting to the Redis: #{e}" @connected = false end + + def add_namespace_labels? + @probe_namespaced && @probe_non_namespaced + end end + # rubocop:enable Metrics/ClassLength end end