# encoding: utf-8 require 'one_apm/collector/sampler' require 'one_apm/inst/background_job/delayed_job_injection' module OneApm module Collector module Samplers class DelayedJobSampler < OneApm::Collector::Sampler named :delayed_job def self.supported_backend? ::Delayed::Worker.backend.to_s == "Delayed::Backend::ActiveRecord::Job" end def initialize raise Unsupported, "DJ queue sampler disabled" if OneApm::Manager.config[:disable_dj] raise Unsupported, "DJ queue sampling unsupported with backend '#{::Delayed::Worker.backend}'" unless self.class.supported_backend? raise Unsupported, "No DJ worker present. Skipping DJ queue sampler" unless OneApm::DelayedJobInjection.worker_name end def record_failed_jobs(value) OneApm::Manager.record_metric("Workers/DelayedJob/failed_jobs", value) end def record_locked_jobs(value) OneApm::Manager.record_metric("Workers/DelayedJob/locked_jobs", value) end OA_FAILED_QUERY = 'failed_at is not NULL'.freeze OA_LOCKED_QUERY = 'locked_by is not NULL'.freeze def failed_jobs count(OA_FAILED_QUERY) end def locked_jobs count(OA_LOCKED_QUERY) end def count(query) if ::ActiveRecord::VERSION::MAJOR.to_i < 4 ::Delayed::Job.count(query) else ::Delayed::Job.where(query).count end end def self.supported_on_this_platform? defined?(::Delayed::Job) end def poll record_failed_jobs(failed_jobs) record_locked_jobs(locked_jobs) record_queue_length_metrics end private def record_queue_length_metrics counts = [] counts << record_counts_by("queue", "name") if ::Delayed::Job.instance_methods.include?(:queue) counts << record_counts_by("priority") all_metric = "Workers/DelayedJob/queue_length/all" OneApm::Manager.record_metric(all_metric, counts.max) end OA_QUEUE_QUERY_CONDITION = 'run_at <= ? and failed_at is NULL'.freeze def record_counts_by(column_name, metric_segment = column_name) all_count = 0 queue_counts(column_name).each do |column_val, count| all_count += count column_val = "default" if column_val.nil? || column_val == "" metric = "Workers/DelayedJob/queue_length/#{metric_segment}/#{column_val}" OneApm::Manager.record_metric(metric, count) end all_count end def queue_counts(column_name) now = ::Delayed::Job.db_time_now result = if ::ActiveRecord::VERSION::MAJOR.to_i < 4 ::Delayed::Job.count(:group => column_name, :conditions => [OA_QUEUE_QUERY_CONDITION, now]) else ::Delayed::Job.where(OA_QUEUE_QUERY_CONDITION, now). group(column_name). count end result.to_a end end end end end