Sha256: 5a448d2bb6d5b5b26fd870edb17da37232e1e546dc1e003bbe3d3f08c0c96f3a

Contents?: true

Size: 981 Bytes

Versions: 1

Compression:

Stored size: 981 Bytes

Contents

module AtomicSidekiq
  class InFlightQueue
    def initialize
      @keymaker = InFlightKeymaker.new(AtomicFetch::IN_FLIGHT_KEY_PREFIX)
    end

    def list
      keys = list_keys
      retrieve_jobs(keys)
    end

    def delete_job(jid)
      job_matcher = keymaker.job_matcher(jid)
      job_keys = retrieve_keys(job_matcher)
      return 0 if job_keys.empty?

      Sidekiq.redis { |conn| conn.del(*job_keys) }
    end

    private

    attr_reader :keymaker

    def list_keys
      matcher = keymaker.matcher
      result = []
      it = 0
      loop do
        it, keys = Sidekiq.redis { |conn| conn.scan(it, match: matcher) }
        result.concat(keys)
        it = it.to_i
        break if it.zero?
      end
      result
    end

    def retrieve_jobs(keys)
      Sidekiq.redis do |conn|
        keys.map { |key| JSON.parse(conn.get(key)) }
      end
    end

    def retrieve_keys(key_wildcard)
      Sidekiq.redis { |conn| conn.keys(key_wildcard) }
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
atomic-sidekiq-1.3.0 lib/atomic_sidekiq/in_flight_queue.rb