lib/models/good_job/job.rb in good_job-2.99.0 vs lib/models/good_job/job.rb in good_job-3.0.0

- old
+ new

@@ -1,11 +1,224 @@ # frozen_string_literal: true module GoodJob - # @deprecated Use {GoodJob::Execution} instead. - class Job < Execution - after_initialize do |_job| - ActiveSupport::Deprecation.warn( - "The `GoodJob::Job` class name is deprecated. Replace with `GoodJob::Execution`." - ) + # ActiveRecord model that represents an +ActiveJob+ job. + # There is not a table in the database whose discrete rows represents "Jobs". + # The +good_jobs+ table is a table of individual {GoodJob::Execution}s that share the same +active_job_id+. + # A single row from the +good_jobs+ table of executions is fetched to represent an Job + class Job < BaseRecord + include Filterable + include Lockable + + # Raised when an inappropriate action is applied to a Job based on its state. + ActionForStateMismatchError = Class.new(StandardError) + # Raised when an action requires GoodJob to be the ActiveJob Queue Adapter but GoodJob is not. + AdapterNotGoodJobError = Class.new(StandardError) + # Attached to a Job's Execution when the Job is discarded. + DiscardJobError = Class.new(StandardError) + + class << self + delegate :table_name, to: GoodJob::Execution + + def table_name=(_value) + raise NotImplementedError, 'Assign GoodJob::Execution.table_name directly' + end + end + + self.primary_key = 'active_job_id' + self.advisory_lockable_column = 'active_job_id' + + has_many :executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', inverse_of: :job + + # Only the most-recent unretried execution represents a "Job" + default_scope { where(retried_good_job_id: nil) } + + # Get Jobs with given class name + # @!method job_class + # @!scope class + # @param string [String] Execution class name + # @return [ActiveRecord::Relation] + scope :job_class, ->(job_class) { where("serialized_params->>'job_class' = ?", job_class) } + + # Get Jobs finished before the given timestamp. + # @!method finished_before(timestamp) + # @!scope class + # @param timestamp (DateTime, Time) + # @return [ActiveRecord::Relation] + scope :finished_before, ->(timestamp) { where(arel_table['finished_at'].lteq(timestamp)) } + + # First execution will run in the future + scope :scheduled, -> { where(finished_at: nil).where('COALESCE(scheduled_at, created_at) > ?', DateTime.current).where("(serialized_params->>'executions')::integer < 2") } + # Execution errored, will run in the future + scope :retried, -> { where(finished_at: nil).where('COALESCE(scheduled_at, created_at) > ?', DateTime.current).where("(serialized_params->>'executions')::integer > 1") } + # Immediate/Scheduled time to run has passed, waiting for an available thread run + scope :queued, -> { where(finished_at: nil).where('COALESCE(scheduled_at, created_at) <= ?', DateTime.current).joins_advisory_locks.where(pg_locks: { locktype: nil }) } + # Advisory locked and executing + scope :running, -> { where(finished_at: nil).joins_advisory_locks.where.not(pg_locks: { locktype: nil }) } + # Completed executing successfully + scope :finished, -> { not_discarded.where.not(finished_at: nil) } + # Errored but will not be retried + scope :discarded, -> { where.not(finished_at: nil).where.not(error: nil) } + # Not errored + scope :not_discarded, -> { where(error: nil) } + + # The job's ActiveJob UUID + # @return [String] + def id + active_job_id + end + + # The ActiveJob job class, as a string + # @return [String] + def job_class + serialized_params['job_class'] + end + + # The status of the Job, based on the state of its most recent execution. + # @return [Symbol] + delegate :status, :last_status_at, to: :head_execution + + # This job's most recent {Execution} + # @param reload [Booelan] whether to reload executions + # @return [Execution] + def head_execution(reload: false) + executions.reload if reload + executions.load # memoize the results + executions.last + end + + # This job's initial/oldest {Execution} + # @return [Execution] + def tail_execution + executions.first + end + + # The number of times this job has been executed, according to ActiveJob's serialized state. + # @return [Numeric] + def executions_count + aj_count = head_execution.serialized_params.fetch('executions', 0) + # The execution count within serialized_params is not updated + # once the underlying execution has been executed. + if status.in? [:discarded, :finished, :running] + aj_count + 1 + else + aj_count + end + end + + # The number of times this job has been executed, according to the number of GoodJob {Execution} records. + # @return [Numeric] + def preserved_executions_count + executions.size + end + + # The most recent error message. + # If the job has been retried, the error will be fetched from the previous {Execution} record. + # @return [String] + def recent_error + head_execution.error || executions[-2]&.error + end + + # Tests whether the job is being executed right now. + # @return [Boolean] + def running? + # Avoid N+1 Query: `.includes_advisory_locks` + if has_attribute?(:locktype) + self['locktype'].present? + else + advisory_locked? + end + end + + # Retry a job that has errored and been discarded. + # This action will create a new {Execution} record for the job. + # @return [ActiveJob::Base] + def retry_job + with_advisory_lock do + execution = head_execution(reload: true) + active_job = execution.active_job + + raise AdapterNotGoodJobError unless active_job.class.queue_adapter.is_a? GoodJob::Adapter + raise ActionForStateMismatchError if execution.finished_at.blank? || execution.error.blank? + + # Update the executions count because the previous execution will not have been preserved + # Do not update `exception_executions` because that comes from rescue_from's arguments + active_job.executions = (active_job.executions || 0) + 1 + + new_active_job = nil + GoodJob::CurrentThread.within do |current_thread| + current_thread.execution = execution + + execution.class.transaction(joinable: false, requires_new: true) do + new_active_job = active_job.retry_job(wait: 0, error: execution.error) + execution.save + end + end + new_active_job + end + end + + # Discard a job so that it will not be executed further. + # This action will add a {DiscardJobError} to the job's {Execution} and mark it as finished. + # @return [void] + def discard_job(message) + with_advisory_lock do + execution = head_execution(reload: true) + active_job = execution.active_job + + raise ActionForStateMismatchError if execution.finished_at.present? + + job_error = GoodJob::Job::DiscardJobError.new(message) + + update_execution = proc do + execution.update( + finished_at: Time.current, + error: [job_error.class, GoodJob::Execution::ERROR_MESSAGE_SEPARATOR, job_error.message].join + ) + end + + if active_job.respond_to?(:instrument) + active_job.send :instrument, :discard, error: job_error, &update_execution + else + update_execution.call + end + end + end + + # Reschedule a scheduled job so that it executes immediately (or later) by the next available execution thread. + # @param scheduled_at [DateTime, Time] When to reschedule the job + # @return [void] + def reschedule_job(scheduled_at = Time.current) + with_advisory_lock do + execution = head_execution(reload: true) + + raise ActionForStateMismatchError if execution.finished_at.present? + + execution = head_execution(reload: true) + execution.update(scheduled_at: scheduled_at) + end + end + + # Destroy all of a discarded or finished job's executions from the database so that it will no longer appear on the dashboard. + # @return [void] + def destroy_job + with_advisory_lock do + execution = head_execution(reload: true) + + raise ActionForStateMismatchError if execution.finished_at.blank? + + destroy + end + end + + # Utility method to determine which execution record is used to represent this job + # @return [String] + def _execution_id + attributes['id'] + end + + # Utility method to test whether this job's underlying attributes represents its most recent execution. + # @return [Boolean] + def _head? + _execution_id == head_execution(reload: true).id end end end