Sha256: 4ad33dc1237ae559eacf74f1dcea8cb7ab4cb21a4d4fba4892c8331ffa2017f7

Contents?: true

Size: 1.38 KB

Versions: 3

Compression:

Stored size: 1.38 KB

Contents

require 'sidekiq_queue_metrics/storage'

module Sidekiq::QueueMetrics
  class << self
    def fetch
      queues = []
      success_and_failed_stats = enqueued_jobs = retry_stats = {}
      together do
        async do
          queues = Sidekiq::Queue.all.map(&:name).map(&:to_s)
          queues.each {|queue| enqueued_jobs[queue] = fetch_enqueued_jobs(queue)}
        end

        async {success_and_failed_stats = fetch_success_and_failed_stats}
        async {retry_stats = fetch_retry_stats}
      end

      queues.map do |queue|
        stats = {'processed' => 0, 'failed' => 0}
        if success_and_failed_stats.has_key?(queue)
          stats['processed'] = val_or_default(success_and_failed_stats[queue]['processed'])
          stats['failed'] = val_or_default(success_and_failed_stats[queue]['failed'])
        end

        stats['enqueued'] = val_or_default(enqueued_jobs[queue])
        stats['in_retry'] = val_or_default(retry_stats[queue])
        {queue => stats}
      end.reduce({}, :merge)
    end

    def fetch_success_and_failed_stats
      JSON.load(Storage.get_stats || '{}')
    end

    def fetch_enqueued_jobs(queue)
      Sidekiq::Queue.new(queue).size
    end

    def fetch_retry_stats
      Sidekiq::RetrySet.new.group_by(&:queue).map {|queue, jobs| [queue, jobs.count]}.to_h
    end

    private
    def val_or_default(val, default = 0)
      val || default
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
sidekiq_queue_metrics-0.0.4 lib/sidekiq_queue_metrics/queue_metrics.rb
sidekiq_queue_metrics-0.0.3 lib/sidekiq_queue_metrics/queue_metrics.rb
sidekiq_queue_metrics-0.0.2 lib/sidekiq_queue_metrics/queue_metrics.rb