Sha256: 0e7cb04fac6512afa136e8e0ebb2a21d7433e842791f867de09a5144750a28cb
Contents?: true
Size: 1.34 KB
Versions: 2
Compression:
Stored size: 1.34 KB
Contents
module AtomicSidekiq class DeadJobCollector class << self def collect!(queues, in_flight_keymaker:, skip_recovery_queues: []) queues.each do |q| new(q, in_flight_keymaker: in_flight_keymaker) .collect!(skip_recovery: skip_recovery_queues.include?(q)) end end end def initialize(queue, in_flight_keymaker:) @recovered_stats = RecoveredStats.new @queue = queue @in_flight_keymaker = in_flight_keymaker @expire_op = AtomicOperation::Expire.new end def collect!(skip_recovery: false) each_keys { |job_key| expire!(job_key, skip_recovery: skip_recovery) } end private attr_reader :queue, :in_flight_keymaker, :expire_op, :recovered_stats def expire!(job_key, skip_recovery:) recovered = expire_op.perform(queue, job_key, recover: !skip_recovery) return if recovered.nil? job = JSON.parse(recovered[1]) recovered_stats.increment!(job) job end def each_keys it = 0 Sidekiq.redis do |conn| loop do it, job_keys = conn.scan(it, match: keys_prefix) it = it.to_i job_keys.each { |job_key| yield(job_key) } break if it.zero? end end end def keys_prefix in_flight_keymaker.queue_matcher(queue) end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
atomic-sidekiq-1.3.0 | lib/atomic_sidekiq/dead_job_collector.rb |
atomic-sidekiq-1.2.0 | lib/atomic_sidekiq/dead_job_collector.rb |