lib/good_job/job.rb in good_job-1.9.1 vs lib/good_job/job.rb in good_job-1.9.2
- old
+ new
@@ -1,9 +1,10 @@
module GoodJob
- #
- # Represents a request to perform an +ActiveJob+ job.
- #
+ # ActiveRecord model that represents an +ActiveJob+ job.
+ # Parent class can be configured with +GoodJob.active_record_parent_class+.
+ # @!parse
+ # class Job < ActiveRecord::Base; end
class Job < Object.const_get(GoodJob.active_record_parent_class)
include Lockable
# Raised if something attempts to execute a previously completed Job again.
PreviouslyPerformedError = Class.new(StandardError)
@@ -17,10 +18,11 @@
attr_readonly :serialized_params
# Parse a string representing a group of queues into a more readable data
# structure.
+ # @param string [String] Queue string
# @return [Hash]
# How to match a given queue. It can have the following keys and values:
# - +{ all: true }+ indicates that all queues match.
# - +{ exclude: Array<String> }+ indicates the listed queue names should
# not match.
@@ -132,33 +134,30 @@
query
end)
# Finds the next eligible Job, acquire an advisory lock related to it, and
# executes the job.
- # @return [Array<(GoodJob::Job, Object, Exception)>, nil]
+ # @return [ExecutionResult, nil]
# If a job was executed, returns an array with the {Job} record, the
# return value for the job's +#perform+ method, and the exception the job
# raised, if any (if the job raised, then the second array entry will be
# +nil+). If there were no jobs to execute, returns +nil+.
def self.perform_with_advisory_lock
- good_job = nil
- result = nil
- error = nil
-
unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
# TODO: Determine why some records are fetched without an advisory lock at all
break unless good_job&.executable?
- result, error = good_job.perform
+ good_job.perform
end
-
- [good_job, result, error] if good_job
end
# Fetches the scheduled execution time of the next eligible Job(s).
- # @return [Array<(DateTime)>]
+ # @param after [DateTime]
+ # @param limit [Integer]
+ # @param now_limit [Integer, nil]
+ # @return [Array<DateTime>]
def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
query = advisory_unlocked.unfinished.schedule_ordered
after ||= Time.current
after_query = query.where('scheduled_at > ?', after).or query.where(scheduled_at: nil).where('created_at > ?', after)
@@ -180,11 +179,10 @@
# @param create_with_advisory_lock [Boolean]
# Whether to establish a lock on the {Job} record after it is created.
# @return [Job]
# The new {Job} instance representing the queued ActiveJob job.
def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
- good_job = nil
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
good_job = GoodJob::Job.new(
queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
priority: active_job.priority || DEFAULT_PRIORITY,
serialized_params: active_job.serialize,
@@ -194,71 +192,69 @@
instrument_payload[:good_job] = good_job
good_job.save!
active_job.provider_job_id = good_job.id
- end
- good_job
+ good_job
+ end
end
# Execute the ActiveJob job this {Job} represents.
- # @return [Array<(Object, Exception)>]
+ # @return [ExecutionResult]
# An array of the return value of the job's +#perform+ method and the
# exception raised by the job, if any. If the job completed successfully,
# the second array entry (the exception) will be +nil+ and vice versa.
def perform
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at
- GoodJob::CurrentExecution.reset
-
self.performed_at = Time.current
save! if GoodJob.preserve_job_records
- result, unhandled_error = execute
+ result = execute
- result_error = nil
- if result.is_a?(Exception)
- result_error = result
- result = nil
- end
-
- job_error = unhandled_error ||
- result_error ||
- GoodJob::CurrentExecution.error_on_retry ||
- GoodJob::CurrentExecution.error_on_discard
-
+ job_error = result.handled_error || result.unhandled_error
self.error = "#{job_error.class}: #{job_error.message}" if job_error
- if unhandled_error && GoodJob.retry_on_unhandled_error
+ if result.unhandled_error && GoodJob.retry_on_unhandled_error
save!
- elsif GoodJob.preserve_job_records == true || (unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error)
+ elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error)
self.finished_at = Time.current
save!
else
destroy!
end
- [result, job_error]
+ result
end
# Tests whether this job is safe to be executed by this thread.
# @return [Boolean]
def executable?
self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
end
private
+ # @return [GoodJob::ExecutionResult]
def execute
params = serialized_params.merge(
"provider_job_id" => id
)
+ GoodJob::CurrentExecution.reset
ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do
- [ActiveJob::Base.execute(params), nil]
+ value = ActiveJob::Base.execute(params)
+
+ if value.is_a?(Exception)
+ handled_error = value
+ value = nil
+ end
+ handled_error ||= GoodJob::CurrentExecution.error_on_retry || GoodJob::CurrentExecution.error_on_discard
+
+ ExecutionResult.new(value: value, handled_error: handled_error)
+ rescue StandardError => e
+ ExecutionResult.new(value: nil, unhandled_error: e)
end
- rescue StandardError => e
- [nil, e]
end
end
end