# frozen_string_literal: true require_relative 'yaml_loader' require_relative 'compatibility' module Delayed module JobGroups class JobGroup < ActiveRecord::Base self.table_name = "#{ActiveRecord::Base.table_name_prefix}delayed_job_groups" if Compatibility.rails_7_1_or_greater? serialize :on_completion_job, coder: YAML, yaml: { unsafe_load: true } serialize :on_completion_job_options, coder: YAML, type: Hash serialize :on_cancellation_job, coder: YAML, yaml: { unsafe_load: true } serialize :on_cancellation_job_options, coder: YAML, type: Hash else serialize :on_completion_job, Delayed::JobGroups::YamlLoader serialize :on_completion_job_options, Hash serialize :on_cancellation_job, Delayed::JobGroups::YamlLoader serialize :on_cancellation_job_options, Hash end validates :queueing_complete, :blocked, :failure_cancels_group, inclusion: [true, false] has_many :active_jobs, -> { where(failed_at: nil) }, class_name: '::Delayed::Job' # Only delete dependent jobs that are unlocked so we can determine if there are in-flight jobs # for canceled job groups has_many :queued_jobs, -> { where(failed_at: nil, locked_by: nil) }, class_name: '::Delayed::Job', dependent: :delete_all scope :ready, -> { where(queueing_complete: true, blocked: false) } scope :with_no_open_jobs, -> do where("NOT EXISTS (#{Delayed::Job.where('delayed_jobs.job_group_id = delayed_job_groups.id').to_sql})") end def mark_queueing_complete with_lock do raise 'JobGroup has already completed queueing' if queueing_complete? update_column(:queueing_complete, true) complete if ready_for_completion? end end def enqueue(job, options = {}) options = options.merge(job_group_id: id) options[:blocked] = blocked? Delayed::Job.enqueue(job, options) end def unblock return unless blocked? with_lock do update_column(:blocked, false) active_jobs.update_all(blocked: false, run_at: Delayed::Job.db_time_now) complete if ready_for_completion? end end def cancel Delayed::Job.enqueue(on_cancellation_job, on_cancellation_job_options || {}) if on_cancellation_job destroy end def check_for_completion(skip_pending_jobs_check: false) self.class.check_for_completion(id, skip_pending_jobs_check: skip_pending_jobs_check) end def self.check_for_completion(job_group_id, skip_pending_jobs_check: false) # Optimization to avoid loading and locking the JobGroup when the group # still has pending jobs return if !skip_pending_jobs_check && has_pending_jobs?(job_group_id) transaction do # The first completed job to notice the job group's queue count has dropped to # zero will queue the job group's completion job and destroy the job group so # other jobs need to handle the job group having been destroyed already. job_group = where(id: job_group_id).lock(true).first job_group.send(:complete) if job_group&.send(:ready_for_completion?) end end def self.has_pending_jobs?(job_group_ids) # rubocop:disable Naming/PredicateName job_group_ids = Array(job_group_ids) return false if job_group_ids.empty? Delayed::Job.where(job_group_id: job_group_ids, failed_at: nil).exists? end private def ready_for_completion? queueing_complete? && !JobGroup.has_pending_jobs?(id) && !blocked? end def complete Delayed::Job.enqueue(on_completion_job, on_completion_job_options || {}) if on_completion_job destroy end end end end