lib/gitlab_exporter/sidekiq.rb in gitlab-exporter-13.1.0 vs lib/gitlab_exporter/sidekiq.rb in gitlab-exporter-13.2.0
- old
+ new
@@ -5,10 +5,11 @@
module GitLab
module Exporter
# A prober for Sidekiq queues
#
# It takes the Redis URL Sidekiq is connected to
+ # rubocop:disable Metrics/ClassLength
class SidekiqProber
# The maximum depth (from the head) of each queue to probe. Probing the
# entirety of a very large queue will take longer and run the risk of
# timing out. But when we have a very large queue, we are most in need of
# reliable metrics. This trades off completeness for predictability by
@@ -35,39 +36,28 @@
def initialize(metrics: PrometheusMetrics.new, logger: nil, **opts)
@opts = opts
@metrics = metrics
@logger = logger
+
+ @probe_namespaced = !!opts[:probe_namespaced] # rubocop:disable Style/DoubleNegation
+ @probe_non_namespaced = !!opts[:probe_non_namespaced] # rubocop:disable Style/DoubleNegation
+
+ # to maintain backward compatibility if the config is missing values
+ @probe_namespaced = true if opts[:probe_namespaced].nil? && opts[:probe_non_namespaced].nil?
end
def probe_stats
- with_sidekiq do
- stats = Sidekiq::Stats.new
+ with_sidekiq { _probe_stats } if @probe_namespaced
+ with_sidekiq(false) { _probe_stats(false) } if @probe_non_namespaced
- @metrics.add("sidekiq_jobs_processed_total", stats.processed)
- @metrics.add("sidekiq_jobs_failed_total", stats.failed)
- @metrics.add("sidekiq_jobs_enqueued_size", stats.enqueued)
- @metrics.add("sidekiq_jobs_scheduled_size", stats.scheduled_size)
- @metrics.add("sidekiq_jobs_retry_size", stats.retry_size)
- @metrics.add("sidekiq_jobs_dead_size", stats.dead_size)
-
- @metrics.add("sidekiq_default_queue_latency_seconds", stats.default_queue_latency)
- @metrics.add("sidekiq_processes_size", stats.processes_size)
- @metrics.add("sidekiq_workers_size", stats.workers_size)
- end
-
self
end
def probe_queues
- with_sidekiq do
- Sidekiq::Queue.all.each do |queue|
- @metrics.add("sidekiq_queue_size", queue.size, name: queue.name)
- @metrics.add("sidekiq_queue_latency_seconds", queue.latency, name: queue.name)
- @metrics.add("sidekiq_queue_paused", queue.paused? ? 1 : 0, name: queue.name)
- end
- end
+ with_sidekiq { _probe_queues } if @probe_namespaced
+ with_sidekiq(false) { _probe_queues(false) } if @probe_non_namespaced
self
end
def probe_jobs
@@ -76,89 +66,38 @@
self
end
def probe_future_sets
- now = Time.now.to_f
- with_sidekiq do
- Sidekiq.redis do |conn|
- Sidekiq::Scheduled::SETS.each do |set|
- # Default to 0; if all jobs are due in the future, there is no "negative" delay.
- delay = 0
-
- _job, timestamp = conn.zrangebyscore(set, "-inf", now.to_s, limit: [0, 1], withscores: true).first
- delay = now - timestamp if timestamp
-
- @metrics.add("sidekiq_#{set}_set_processing_delay_seconds", delay)
-
- # zcount is O(log(N)) (prob. binary search), so is still quick even with large sets
- @metrics.add("sidekiq_#{set}_set_backlog_count",
- conn.zcount(set, "-inf", now.to_s))
- end
- end
- end
+ with_sidekiq { _probe_future_sets } if @probe_namespaced
+ with_sidekiq(false) { _probe_future_sets(false) } if @probe_non_namespaced
end
# Count worker classes present in Sidekiq queues. This only looks at the
# first PROBE_JOBS_LIMIT jobs in each queue. This means that we run a
# single LRANGE command for each queue, which does not block other
# commands. For queues over PROBE_JOBS_LIMIT in size, this means that we
# will not have completely accurate statistics, but the probe performance
# will also not degrade as the queue gets larger.
def probe_jobs_limit
- with_sidekiq do
- job_stats = Hash.new(0)
+ with_sidekiq { _probe_jobs_limit } if @probe_namespaced
+ with_sidekiq(false) { _probe_jobs_limit(false) } if @probe_non_namespaced
- Sidekiq::Queue.all.each do |queue|
- Sidekiq.redis do |conn|
- conn.lrange("queue:#{queue.name}", 0, PROBE_JOBS_LIMIT).each do |job|
- job_class = Sidekiq.load_json(job)["class"]
-
- job_stats[job_class] += 1
- end
- end
- end
-
- job_stats.each do |class_name, count|
- @metrics.add("sidekiq_enqueued_jobs", count, name: class_name)
- end
- end
-
self
end
def probe_workers
- with_sidekiq do
- worker_stats = Hash.new(0)
+ with_sidekiq { _probe_workers } if @probe_namespaced
+ with_sidekiq(false) { _probe_workers(false) } if @probe_non_namespaced
- Sidekiq::Workers.new.map do |_pid, _tid, work|
- job_klass = work["payload"]["class"]
-
- worker_stats[job_klass] += 1
- end
-
- worker_stats.each do |class_name, count|
- @metrics.add("sidekiq_running_jobs", count, name: class_name)
- end
- end
-
self
end
def probe_retries
- with_sidekiq do
- retry_stats = Hash.new(0)
+ with_sidekiq { _probe_retries } if @probe_namespaced
+ with_sidekiq(false) { _probe_retries(false) } if @probe_non_namespaced
- Sidekiq::RetrySet.new.map do |job|
- retry_stats[job.klass] += 1
- end
-
- retry_stats.each do |class_name, count|
- @metrics.add("sidekiq_to_be_retried_jobs", count, name: class_name)
- end
- end
-
self
end
def probe_dead
puts "[DEPRECATED] probe_dead is now considered obsolete and will be removed in future major versions,"\
@@ -175,31 +114,125 @@
target.write(@metrics.to_s)
end
private
- def with_sidekiq
+ def _probe_workers(namespace = true)
+ labels = add_namespace_labels? ? { namespaced: namespace } : {}
+ worker_stats = Hash.new(0)
+
+ Sidekiq::Workers.new.map do |_pid, _tid, work|
+ job_klass = work["payload"]["class"]
+
+ worker_stats[job_klass] += 1
+ end
+
+ worker_stats.each do |class_name, count|
+ @metrics.add("sidekiq_running_jobs", count, **labels.merge({ name: class_name }))
+ end
+ end
+
+ def _probe_future_sets(namespace = true)
+ labels = add_namespace_labels? ? { namespaced: namespace } : {}
+ now = Time.now.to_f
+ Sidekiq.redis do |conn|
+ Sidekiq::Scheduled::SETS.each do |set|
+ # Default to 0; if all jobs are due in the future, there is no "negative" delay.
+ delay = 0
+
+ _job, timestamp = conn.zrangebyscore(set, "-inf", now.to_s, limit: [0, 1], withscores: true).first
+ delay = now - timestamp if timestamp
+
+ @metrics.add("sidekiq_#{set}_set_processing_delay_seconds", delay, **labels)
+
+ # zcount is O(log(N)) (prob. binary search), so is still quick even with large sets
+ @metrics.add("sidekiq_#{set}_set_backlog_count",
+ conn.zcount(set, "-inf", now.to_s), **labels)
+ end
+ end
+ end
+
+ def _probe_stats(namespace = true)
+ stats = Sidekiq::Stats.new
+ labels = add_namespace_labels? ? { namespaced: namespace } : {}
+
+ @metrics.add("sidekiq_jobs_processed_total", stats.processed, **labels)
+ @metrics.add("sidekiq_jobs_failed_total", stats.failed, **labels)
+ @metrics.add("sidekiq_jobs_enqueued_size", stats.enqueued, **labels)
+ @metrics.add("sidekiq_jobs_scheduled_size", stats.scheduled_size, **labels)
+ @metrics.add("sidekiq_jobs_retry_size", stats.retry_size, **labels)
+ @metrics.add("sidekiq_jobs_dead_size", stats.dead_size, **labels)
+
+ @metrics.add("sidekiq_default_queue_latency_seconds", stats.default_queue_latency, **labels)
+ @metrics.add("sidekiq_processes_size", stats.processes_size, **labels)
+ @metrics.add("sidekiq_workers_size", stats.workers_size, **labels)
+ end
+
+ def _probe_queues(namespace = true)
+ Sidekiq::Queue.all.each do |queue|
+ labels = { name: queue.name }
+ labels[:namespaced] = namespace if add_namespace_labels?
+
+ @metrics.add("sidekiq_queue_size", queue.size, **labels)
+ @metrics.add("sidekiq_queue_latency_seconds", queue.latency, **labels)
+ @metrics.add("sidekiq_queue_paused", queue.paused? ? 1 : 0, **labels)
+ end
+ end
+
+ def _probe_jobs_limit(namespace = true)
+ labels = add_namespace_labels? ? { namespaced: namespace } : {}
+ job_stats = Hash.new(0)
+
+ Sidekiq::Queue.all.each do |queue|
+ Sidekiq.redis do |conn|
+ conn.lrange("queue:#{queue.name}", 0, PROBE_JOBS_LIMIT).each do |job|
+ job_class = Sidekiq.load_json(job)["class"]
+
+ job_stats[job_class] += 1
+ end
+ end
+ end
+
+ job_stats.each do |class_name, count|
+ @metrics.add("sidekiq_enqueued_jobs", count, **labels.merge({ name: class_name }))
+ end
+ end
+
+ def _probe_retries(namespace = true)
+ labels = add_namespace_labels? ? { namespaced: namespace } : {}
+ retry_stats = Hash.new(0)
+
+ Sidekiq::RetrySet.new.map do |job|
+ retry_stats[job.klass] += 1
+ end
+
+ retry_stats.each do |class_name, count|
+ @metrics.add("sidekiq_to_be_retried_jobs", count, **labels.merge({ name: class_name }))
+ end
+ end
+
+ def with_sidekiq(namespaced = true)
# TODO: this is not concurrent safe as we change global context
# It means that we are unable to use many different sidekiq's
# which is not a problem as of now
Sidekiq.configure_client do |config|
- config.redis = self.class.connection_pool[redis_options]
+ config.redis = self.class.connection_pool[redis_options(namespaced)]
end
return unless connected?
yield
end
- def redis_options
+ def redis_options(namespaced = true)
options = {
url: @opts[:redis_url],
- namespace: "resque:gitlab",
connect_timeout: 1,
reconnect_attempts: 0
}
+ options[:namespace] = "resque:gitlab" if namespaced
options[:id] = nil unless redis_enable_client?
options
end
def redis_enable_client?
@@ -216,8 +249,13 @@
end
rescue Redis::BaseConnectionError => e
@logger&.error "Error connecting to the Redis: #{e}"
@connected = false
end
+
+ def add_namespace_labels?
+ @probe_namespaced && @probe_non_namespaced
+ end
end
+ # rubocop:enable Metrics/ClassLength
end
end