Sha256: f978b87e4e2dd0a881cd5acf74eefdc1dc3f1a30f5c40bd9a7a248ac30922384

Contents?: true

Size: 945 Bytes

Versions: 1

Compression:

Stored size: 945 Bytes

Contents

module AtomicSidekiq
  class DeadJobCollector
    class << self
      def collect!(queues)
        queues.each { |q| new(q).collect! }
      end
    end

    def initialize(queue, in_flight_prefix: AtomicFetch::IN_FLIGHT_KEY_PREFIX)
      @queue            = queue
      @in_flight_prefix = in_flight_prefix
      @expire_op        = AtomicOperation::Expire.new
    end

    def collect!
      each_keys { |job_key| expire!(job_key) }
    end

    private

    attr_reader :queue, :in_flight_prefix, :expire_op

    def expire!(job_key)
      expire_op.perform(queue, job_key)
    end

    def each_keys(&block)
      it = 0
      Sidekiq.redis do |conn|
        loop do
          it, job_keys = conn.scan(it, match: keys_prefix)
          it = it.to_i
          job_keys.each { |job_key| block.call(job_key) }
          break if it == 0
        end
      end
    end

    def keys_prefix
      "#{in_flight_prefix}#{queue}:*"
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
atomic-sidekiq-1.0.0 lib/atomic_sidekiq/dead_job_collector.rb