Sha256: 23b3451ebbcb7da61ed27c9ebe00bfd1e57186f6adb739aa7b26c4e1dee7d144

Contents?: true

Size: 1.99 KB

Versions: 16

Compression:

Stored size: 1.99 KB

Contents

# frozen_string_literal: true

module SidekiqUniqueJobs
  module OnConflict
    # Strategy to replace the job on conflict
    #
    # @author Mikael Henriksson <mikael@zoolutions.se>
    class Replace < OnConflict::Strategy
      #
      # @!attribute [r] queue
      #   @return [String] rthe sidekiq queue this job belongs to
      attr_reader :queue
      #
      # @!attribute [r] unique_digest
      #   @return [String] the unique digest to use for locking
      attr_reader :unique_digest

      #
      # Initialize a new Replace strategy
      #
      # @param [Hash] item sidekiq job hash
      #
      def initialize(item, redis_pool = nil)
        super(item, redis_pool)
        @queue         = item[QUEUE]
        @unique_digest = item[LOCK_DIGEST]
      end

      #
      # Replace the old job in the queue
      #
      #
      # @return [void] <description>
      #
      # @yield to retry the lock after deleting the old one
      #
      def call(&block)
        return unless (deleted_job = delete_job_by_digest)

        log_info("Deleting job: #{deleted_job}")
        if (del_count = delete_lock)
          log_info("Deleted `#{del_count}` keys for #{unique_digest}")
        end
        block&.call
      end

      #
      # Delete the job from either schedule, retry or the queue
      #
      #
      # @return [String] the deleted job hash
      # @return [nil] when deleting nothing
      #
      def delete_job_by_digest
        call_script(:delete_job_by_digest,
                    keys: ["#{QUEUE}:#{queue}", SCHEDULE, RETRY],
                    argv: [unique_digest])
      end

      #
      # Delete the keys belonging to the job
      #
      #
      # @return [Integer] the number of keys deleted
      #
      def delete_lock
        digests.delete_by_digest(unique_digest)
      end

      #
      # Access to the {Digests}
      #
      #
      # @return [Digests] and instance with digests
      #
      def digests
        @digests ||= SidekiqUniqueJobs::Digests.new
      end
    end
  end
end

Version data entries

16 entries across 16 versions & 1 rubygems

Version Path
sidekiq-unique-jobs-7.0.0.beta25 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta24 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta23 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta22 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta21 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta20 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta19 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta18 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta17 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta16 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta15 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta14 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta13 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta12 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta11 lib/sidekiq_unique_jobs/on_conflict/replace.rb
sidekiq-unique-jobs-7.0.0.beta10 lib/sidekiq_unique_jobs/on_conflict/replace.rb