# frozen_string_literal: true require "sidekiq_unique_jobs" module CanvasSync::JobUniqueness # This class is intended to be the complete translation layer between CanvasSync::JobUniqueness and SidekiqUniqueJobs. # In other words, you could consider it the "locking backend" and thus could potentially swap out SUJ for a more succinct solution. # # SUJ's implementation is somewhat complex, but is somewhat pre-tailored over (eg) https://github.com/leandromoreira/redlock-rb. # Mainly SUJ tracks the JID so that if a process dies, another can pick up the job without having to figure out how to unlock it. # SUJ also handles the integration into Sidekiq Web, which is a nice bonus. class Locksmith < SidekiqUniqueJobs::Locksmith attr_reader :lock_context def initialize(key, lock_context, redis_pool = nil) @lock_context = lock_context @job_id = lock_context.lock_id # Yes, .lock_id is intentional @item = lock_context @key = SidekiqUniqueJobs::Key.new(key) lcfg = lock_context.config @config = OpenStruct.new({ :"type" => lcfg[:strategy], :"pttl" => lcfg[:ttl] * 1000, :"timeout" => lcfg[:timeout], :"wait_for_lock?" => lcfg[:ttl]&.positive?, :"lock_info" => false, :"limit" => lcfg[:limit], }) @redis_pool = redis_pool end def locked_jids SidekiqUniqueJobs::Lock.new(@key).locked_jids end def swap_locks(old_jid) olimit = lock_context.config[:limit] new_jid = @job_id return if old_jid == new_jid # NB This is quite hacky, but should work # # Ideally the unlock(old) and lock(new) would be atomic, but that increases the amount of coupling with Sidekiq-Unique-Jobs - right now, # we're using fairly stable (though still internal) SUJ APIs; I fear that writing custom Lua will be significantly more brittle # # In the general case, we'd only bump limit by 1, but that leaves a potential race-condition when limit is configured > 1: # (Assuming until_and_while_executing, reschedule, limit = 2): # (Workers are performing 2 Jobs, RUN lock count = 2) # Worker 1 pulls Job A # Worker 2 pulls Job B # W1 and W2 both fail to get the runtime lock # W1 and W2 call swap_locks # W1 calls lock(limit+1), lock is granted, lock count becomes limit+1 # W2 calls lock(limit+1), lock is denied because count would be limit+2 # W1 calls unlock(old_jid) # Force creation of another lock, ignoring the limit @config.limit = olimit + 100 result = lock # Release the old lock, bringing us back within the limit @job_id = old_jid unlock result ensure @config.limit = olimit @job_id = new_jid end private def lock_score lock_context.job_score end def lock_info @lock_info = JSON.dump(lock_context.debug_data) end # def taken?(conn) # v = conn.hexists(key.locked, job_id) # v.is_a?(Numeric) ? v != 0 : v # end def redis_version @redis_version ||= CanvasSync::JobUniqueness.config.redis_version || SidekiqUniqueJobs.fetch_redis_version end end end