lib/gitlab_exporter/sidekiq.rb in gitlab-exporter-13.5.0 vs lib/gitlab_exporter/sidekiq.rb in gitlab-exporter-14.0.0
- old
+ new
@@ -5,11 +5,10 @@
module GitLab
module Exporter
# A prober for Sidekiq queues
#
# It takes the Redis URL Sidekiq is connected to
- # rubocop:disable Metrics/ClassLength
class SidekiqProber
# The maximum depth (from the head) of each queue to probe. Probing the
# entirety of a very large queue will take longer and run the risk of
# timing out. But when we have a very large queue, we are most in need of
# reliable metrics. This trades off completeness for predictability by
@@ -39,28 +38,39 @@
def initialize(metrics: PrometheusMetrics.new, logger: nil, **opts)
@opts = opts
@metrics = metrics
@logger = logger
-
- @probe_namespaced = !!opts[:probe_namespaced] # rubocop:disable Style/DoubleNegation
- @probe_non_namespaced = !!opts[:probe_non_namespaced] # rubocop:disable Style/DoubleNegation
-
- # to maintain backward compatibility if the config is missing values
- @probe_namespaced = true if opts[:probe_namespaced].nil? && opts[:probe_non_namespaced].nil?
end
def probe_stats
- with_sidekiq { _probe_stats } if @probe_namespaced
- with_sidekiq(false) { _probe_stats(false) } if @probe_non_namespaced
+ with_sidekiq do
+ stats = Sidekiq::Stats.new
+ @metrics.add("sidekiq_jobs_processed_total", stats.processed)
+ @metrics.add("sidekiq_jobs_failed_total", stats.failed)
+ @metrics.add("sidekiq_jobs_enqueued_size", stats.enqueued)
+ @metrics.add("sidekiq_jobs_scheduled_size", stats.scheduled_size)
+ @metrics.add("sidekiq_jobs_retry_size", stats.retry_size)
+ @metrics.add("sidekiq_jobs_dead_size", stats.dead_size)
+
+ @metrics.add("sidekiq_default_queue_latency_seconds", stats.default_queue_latency)
+ @metrics.add("sidekiq_processes_size", stats.processes_size)
+ @metrics.add("sidekiq_workers_size", stats.workers_size)
+ end
+
self
end
def probe_queues
- with_sidekiq { _probe_queues } if @probe_namespaced
- with_sidekiq(false) { _probe_queues(false) } if @probe_non_namespaced
+ with_sidekiq do
+ Sidekiq::Queue.all.each do |queue|
+ @metrics.add("sidekiq_queue_size", queue.size, name: queue.name)
+ @metrics.add("sidekiq_queue_latency_seconds", queue.latency, name: queue.name)
+ @metrics.add("sidekiq_queue_paused", queue.paused? ? 1 : 0, name: queue.name)
+ end
+ end
self
end
def probe_jobs
@@ -69,38 +79,89 @@
self
end
def probe_future_sets
- with_sidekiq { _probe_future_sets } if @probe_namespaced
- with_sidekiq(false) { _probe_future_sets(false) } if @probe_non_namespaced
+ now = Time.now.to_f
+ with_sidekiq do
+ Sidekiq.redis do |conn|
+ Sidekiq::Scheduled::SETS.each do |set|
+ # Default to 0; if all jobs are due in the future, there is no "negative" delay.
+ delay = 0
+
+ _job, timestamp = conn.zrangebyscore(set, "-inf", now.to_s, limit: [0, 1], withscores: true).first
+ delay = now - timestamp if timestamp
+
+ @metrics.add("sidekiq_#{set}_set_processing_delay_seconds", delay)
+
+ # zcount is O(log(N)) (prob. binary search), so is still quick even with large sets
+ @metrics.add("sidekiq_#{set}_set_backlog_count",
+ conn.zcount(set, "-inf", now.to_s))
+ end
+ end
+ end
end
# Count worker classes present in Sidekiq queues. This only looks at the
# first PROBE_JOBS_LIMIT jobs in each queue. This means that we run a
# single LRANGE command for each queue, which does not block other
# commands. For queues over PROBE_JOBS_LIMIT in size, this means that we
# will not have completely accurate statistics, but the probe performance
# will also not degrade as the queue gets larger.
def probe_jobs_limit
- with_sidekiq { _probe_jobs_limit } if @probe_namespaced
- with_sidekiq(false) { _probe_jobs_limit(false) } if @probe_non_namespaced
+ with_sidekiq do
+ job_stats = Hash.new(0)
+ Sidekiq::Queue.all.each do |queue|
+ Sidekiq.redis do |conn|
+ conn.lrange("queue:#{queue.name}", 0, PROBE_JOBS_LIMIT).each do |job|
+ job_class = Sidekiq.load_json(job)["class"]
+
+ job_stats[job_class] += 1
+ end
+ end
+ end
+
+ job_stats.each do |class_name, count|
+ @metrics.add("sidekiq_enqueued_jobs", count, name: class_name)
+ end
+ end
+
self
end
def probe_workers
- with_sidekiq { _probe_workers } if @probe_namespaced
- with_sidekiq(false) { _probe_workers(false) } if @probe_non_namespaced
+ with_sidekiq do
+ worker_stats = Hash.new(0)
+ Sidekiq::Workers.new.map do |_pid, _tid, work|
+ job_klass = work["payload"]["class"]
+
+ worker_stats[job_klass] += 1
+ end
+
+ worker_stats.each do |class_name, count|
+ @metrics.add("sidekiq_running_jobs", count, name: class_name)
+ end
+ end
+
self
end
def probe_retries
- with_sidekiq { _probe_retries } if @probe_namespaced
- with_sidekiq(false) { _probe_retries(false) } if @probe_non_namespaced
+ with_sidekiq do
+ retry_stats = Hash.new(0)
+ Sidekiq::RetrySet.new.map do |job|
+ retry_stats[job.klass] += 1
+ end
+
+ retry_stats.each do |class_name, count|
+ @metrics.add("sidekiq_to_be_retried_jobs", count, name: class_name)
+ end
+ end
+
self
end
def probe_dead
puts "[DEPRECATED] probe_dead is now considered obsolete and will be removed in future major versions,"\
@@ -117,124 +178,29 @@
target.write(@metrics.to_s)
end
private
- def _probe_workers(namespace = true)
- labels = add_namespace_labels? ? { namespaced: namespace } : {}
- worker_stats = Hash.new(0)
-
- Sidekiq::Workers.new.map do |_pid, _tid, work|
- job_klass = work["payload"]["class"]
-
- worker_stats[job_klass] += 1
- end
-
- worker_stats.each do |class_name, count|
- @metrics.add("sidekiq_running_jobs", count, **labels.merge({ name: class_name }))
- end
- end
-
- def _probe_future_sets(namespace = true)
- labels = add_namespace_labels? ? { namespaced: namespace } : {}
- now = Time.now.to_f
- Sidekiq.redis do |conn|
- Sidekiq::Scheduled::SETS.each do |set|
- # Default to 0; if all jobs are due in the future, there is no "negative" delay.
- delay = 0
-
- _job, timestamp = conn.zrangebyscore(set, "-inf", now.to_s, limit: [0, 1], withscores: true).first
- delay = now - timestamp if timestamp
-
- @metrics.add("sidekiq_#{set}_set_processing_delay_seconds", delay, **labels)
-
- # zcount is O(log(N)) (prob. binary search), so is still quick even with large sets
- @metrics.add("sidekiq_#{set}_set_backlog_count",
- conn.zcount(set, "-inf", now.to_s), **labels)
- end
- end
- end
-
- def _probe_stats(namespace = true)
- stats = Sidekiq::Stats.new
- labels = add_namespace_labels? ? { namespaced: namespace } : {}
-
- @metrics.add("sidekiq_jobs_processed_total", stats.processed, **labels)
- @metrics.add("sidekiq_jobs_failed_total", stats.failed, **labels)
- @metrics.add("sidekiq_jobs_enqueued_size", stats.enqueued, **labels)
- @metrics.add("sidekiq_jobs_scheduled_size", stats.scheduled_size, **labels)
- @metrics.add("sidekiq_jobs_retry_size", stats.retry_size, **labels)
- @metrics.add("sidekiq_jobs_dead_size", stats.dead_size, **labels)
-
- @metrics.add("sidekiq_default_queue_latency_seconds", stats.default_queue_latency, **labels)
- @metrics.add("sidekiq_processes_size", stats.processes_size, **labels)
- @metrics.add("sidekiq_workers_size", stats.workers_size, **labels)
- end
-
- def _probe_queues(namespace = true)
- Sidekiq::Queue.all.each do |queue|
- labels = { name: queue.name }
- labels[:namespaced] = namespace if add_namespace_labels?
-
- @metrics.add("sidekiq_queue_size", queue.size, **labels)
- @metrics.add("sidekiq_queue_latency_seconds", queue.latency, **labels)
- @metrics.add("sidekiq_queue_paused", queue.paused? ? 1 : 0, **labels)
- end
- end
-
- def _probe_jobs_limit(namespace = true)
- labels = add_namespace_labels? ? { namespaced: namespace } : {}
- job_stats = Hash.new(0)
-
- Sidekiq::Queue.all.each do |queue|
- Sidekiq.redis do |conn|
- conn.lrange("queue:#{queue.name}", 0, PROBE_JOBS_LIMIT).each do |job|
- job_class = Sidekiq.load_json(job)["class"]
-
- job_stats[job_class] += 1
- end
- end
- end
-
- job_stats.each do |class_name, count|
- @metrics.add("sidekiq_enqueued_jobs", count, **labels.merge({ name: class_name }))
- end
- end
-
- def _probe_retries(namespace = true)
- labels = add_namespace_labels? ? { namespaced: namespace } : {}
- retry_stats = Hash.new(0)
-
- Sidekiq::RetrySet.new.map do |job|
- retry_stats[job.klass] += 1
- end
-
- retry_stats.each do |class_name, count|
- @metrics.add("sidekiq_to_be_retried_jobs", count, **labels.merge({ name: class_name }))
- end
- end
-
- def with_sidekiq(namespaced = true)
+ def with_sidekiq
SIDEKIQ_REDIS_LOCK.synchronize {
Sidekiq.configure_client do |config|
- config.redis = self.class.connection_pool[redis_options(namespaced)]
+ config.redis = self.class.connection_pool[redis_options]
end
return unless connected?
yield
}
end
- def redis_options(namespaced = true)
+ def redis_options
options = {
url: @opts[:redis_url],
connect_timeout: 1,
reconnect_attempts: 0
}
- options[:namespace] = "resque:gitlab" if namespaced
options[:id] = nil unless redis_enable_client?
options
end
def redis_enable_client?
@@ -251,13 +217,8 @@
end
rescue Redis::BaseConnectionError => e
@logger&.error "Error connecting to the Redis: #{e}"
@connected = false
end
-
- def add_namespace_labels?
- @probe_namespaced && @probe_non_namespaced
- end
end
- # rubocop:enable Metrics/ClassLength
end
end