Sha256: 0a46cac97662c9fd73f7f869a700bdfec7741cb6b355062170ebb4bddda0b315
Contents?: true
Size: 1.99 KB
Versions: 4
Compression:
Stored size: 1.99 KB
Contents
# frozen_string_literal: true module SidekiqUniqueJobs module OnConflict # Strategy to replace the job on conflict # # @author Mikael Henriksson <mikael@mhenrixon.com> class Replace < OnConflict::Strategy # # @!attribute [r] queue # @return [String] rthe sidekiq queue this job belongs to attr_reader :queue # # @!attribute [r] unique_digest # @return [String] the unique digest to use for locking attr_reader :unique_digest # # Initialize a new Replace strategy # # @param [Hash] item sidekiq job hash # def initialize(item, redis_pool = nil) super(item, redis_pool) @queue = item[QUEUE] @unique_digest = item[LOCK_DIGEST] end # # Replace the old job in the queue # # # @return [void] <description> # # @yield to retry the lock after deleting the old one # def call(&block) return unless (deleted_job = delete_job_by_digest) log_info("Deleting job: #{deleted_job}") if (del_count = delete_lock) log_info("Deleted `#{del_count}` keys for #{unique_digest}") end block&.call end # # Delete the job from either schedule, retry or the queue # # # @return [String] the deleted job hash # @return [nil] when deleting nothing # def delete_job_by_digest call_script(:delete_job_by_digest, keys: ["#{QUEUE}:#{queue}", SCHEDULE, RETRY], argv: [unique_digest]) end # # Delete the keys belonging to the job # # # @return [Integer] the number of keys deleted # def delete_lock digests.delete_by_digest(unique_digest) end # # Access to the {Digests} # # # @return [Digests] and instance with digests # def digests @digests ||= SidekiqUniqueJobs::Digests.new end end end end
Version data entries
4 entries across 4 versions & 1 rubygems