Sha256: 5036a67a1c689c9bf5451f2b9b987da7a04826e4953c96ff43dc51d342e81e00
Contents?: true
Size: 1.96 KB
Versions: 1
Compression:
Stored size: 1.96 KB
Contents
module QueueClassicPlus module UpdateMetrics def self.update { "queued" => "queue_classic_jobs", "scheduled" => "queue_classic_later_jobs", }.each do |type, table| next unless ActiveRecord::Base.connection.table_exists?(table) q = "SELECT q_name, COUNT(1) FROM #{table} GROUP BY q_name" results = execute(q) # Log individual queue sizes results.each do |h| Metrics.measure("qc.jobs_#{type}", h.fetch('count').to_i, source: h.fetch('q_name')) end end # Log oldest locked_at and created_at ['locked_at', 'created_at'].each do |column| age = max_age(column) Metrics.measure("qc.max_#{column}", age) end # Log oldes unlocked jobs age = max_age("created_at", "locked_at IS NULL") Metrics.measure("qc.max_created_at.unlocked", age) if ActiveRecord::Base.connection.table_exists?('queue_classic_later_jobs') lag = execute("SELECT MAX(EXTRACT(EPOCH FROM now() - not_before)) AS lag FROM queue_classic_later_jobs").first lag = lag ? lag['lag'] : 0 Metrics.measure("qc.jobs_delayed.lag", lag.to_f) nb_late = execute("SELECT COUNT(1) FROM queue_classic_later_jobs WHERE not_before < NOW()").first nb_late = nb_late ? nb_late['count'] : 0 Metrics.measure("qc.jobs_delayed.late_count", nb_late.to_i) end end private def self.max_age(column, *conditions) conditions.unshift "q_name != '#{::QueueClassicPlus::CustomWorker::FailedQueue.name}'" q = "SELECT EXTRACT(EPOCH FROM now() - #{column}) AS age_in_seconds FROM queue_classic_jobs WHERE #{conditions.join(" AND ")} ORDER BY age_in_seconds DESC " age_info = execute(q).first age_info ? age_info['age_in_seconds'].to_i : 0 end def self.execute(q) ActiveRecord::Base.connection.execute(q) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
queue_classic_plus-0.0.2 | lib/queue_classic_plus/update_metrics.rb |