Sha256: 71dda136ff50e6b25ba1d53b1f20f01160493b8034029c5e3f940cce21489d11

Contents?: true

Size: 1.54 KB

Versions: 2

Compression:

Stored size: 1.54 KB

Contents

module Sidekiq
  module Monitor
    class Cleaner
      # Cleans up records that are no longer in sync with Sidekiq's records
      def clean
        clean_queued
        clean_running
      end

      private

      def clean_queued
        Sidekiq.redis do |conn|
          queues = conn.smembers('queues')
          queued_jids = []
          queues.each do |queue|
            workers = conn.lrange("queue:#{queue}", 0, -1)
            workers.each do |worker|
              worker = Sidekiq.load_json(worker)
              queued_jids << worker['jid']
            end
          end

          Sidekiq::Monitor::Job.where(status: 'queued').each do |job|
            unless queued_jids.include?(job.jid)
              job.update_attributes(
                finished_at: DateTime.now,
                status: 'interrupted'
              )
            end
          end
        end
      end

      def clean_running
        Sidekiq.redis do |conn|
          workers = conn.smembers('workers')
          busy_jids = []
          workers.each do |worker|
            worker = conn.get("worker:#{worker}")
            next if worker.blank?
            worker = Sidekiq.load_json(worker)
            busy_jids << worker['payload']['jid']
          end

          Sidekiq::Monitor::Job.where(status: 'running').each do |job|
            unless busy_jids.include?(job.jid)
              job.update_attributes(
                finished_at: DateTime.now,
                status: 'interrupted'
              )
            end
          end
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
sidekiq_monitor-0.0.2 lib/sidekiq/monitor/cleaner.rb
sidekiq_monitor-0.0.1 lib/sidekiq/monitor/cleaner.rb