module CanvasSync::JobUniqueness module Strategy class Base attr_reader :lock_context def initialize(lock_context) @lock_context = lock_context @conflict_strategies = {} end class_attribute :_locks_on, instance_writer: false def self.locks_on(*origins) if origins.present? orgins = Array(origins).map(&:to_sym) self._locks_on = origins else self._locks_on || [:enqueue, :perform] end end def on_enqueue; end def on_perform; end def batch_callback(event, batch_status) if event == :success unlock else unlock_cond = lock_context.config[:unlock_on_failure] if (event == :complete && unlock_cond == :any) || (event == :death && unlock_cond == :death) || (event == :stagnated && unlock_cond == :stagnant) unlock end end end delegate :locked?, to: :locksmith protected def key lock_context.base_key end def wrap_in_batch(&blk) if Thread.current[:unique_jobs_previous_jid] # Ensure we don't re-wrap in a batch when rescheduling return blk.call end batch = CanvasSync::JobBatches::Batch.new batch.context = { uniqueness_lock_key: key, } CanvasSync::JobBatches::Batch::Callback::VALID_CALLBACKS.each do |callback| callback = callback.to_sym batch.on(callback, self.class.to_s + ".internal_batch_callback", { event: callback, lock_strategy: self.class.to_s, lock_key: key, lock_context: lock_context.serialize, }) end CanvasSync::JobUniqueness.logger.debug("Wrapped job in Locking Batch #{batch.bid} for #{key}") batch.jobs do return blk.call end end def self.internal_batch_callback(batch_status, opts) CanvasSync::JobUniqueness.logger.debug("Received Batch(#{batch_status.bid}) callback for #{opts[:lock_strategy]} #{opts[:lock_key]} - #{opts[:event]}") strategy_class = opts[:lock_strategy].constantize lock_context = LockContext.from_serialized(opts[:lock_context]) strategy = strategy_class.new(lock_context) # TODO Should this route through LockContext#handle_lifecycle!? strategy.batch_callback(opts[:event].to_sym, batch_status) end def lock!(purpose, wait: nil) locked = nil if purpose == :enqueue if Thread.current[:unique_jobs_previous_jid].present? locked = locksmith.swap_locks(Thread.current[:unique_jobs_previous_jid]) else locked = locksmith.lock() end elsif purpose == :perform locked = locksmith.execute { lock_context.job_id } end CanvasSync::JobUniqueness.logger.debug { "Requested lock of #{key} for #{purpose} - (#{locked || 'Not Obtained!'})" } raise CouldNotLockError.new(lock_context, source: purpose) if !locked end def unlock() result = locksmith.unlock CanvasSync::JobUniqueness.logger.debug { "Unlocked #{key} - (#{result || 'Not Unlocked!'})" } end def locksmith @locksmith ||= Locksmith.new(key, lock_context) end end end end