app/models/good_job/execution.rb in good_job-3.5.1 vs app/models/good_job/execution.rb in good_job-3.6.0
- old
+ new
@@ -64,10 +64,11 @@
{ include: queues }
end
end
belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions
+ after_destroy -> { self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job }
# Get executions with given ActiveJob ID
# @!method active_job_id
# @!scope class
# @param active_job_id [String]
@@ -201,14 +202,14 @@
# @return [ExecutionResult, nil]
# If a job was executed, returns an array with the {Execution} record, the
# return value for the job's +#perform+ method, and the exception the job
# raised, if any (if the job raised, then the second array entry will be
# +nil+). If there were no jobs to execute, returns +nil+.
- def self.perform_with_advisory_lock(parsed_queues: nil)
+ def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil)
execution = nil
result = nil
- unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(unlock_session: true) do |executions|
+ unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(unlock_session: true, select_limit: queue_select_limit) do |executions|
execution = executions.first
break if execution.blank?
break :unlocked unless execution&.executable?
result = execution.perform
@@ -294,17 +295,18 @@
result = execute
job_error = result.handled_error || result.unhandled_error
self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error
+ reenqueued = result.retried? || retried_good_job_id.present?
if result.unhandled_error && GoodJob.retry_on_unhandled_error
save!
- elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error)
+ elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error)
self.finished_at = Time.current
save!
else
- destroy!
+ destroy_job
end
result
end
@@ -352,10 +354,18 @@
# Time between when this job started and finished
def runtime_latency
(finished_at || Time.zone.now) - performed_at if performed_at
end
+ # Destroys this execution and all executions within the same job
+ def destroy_job
+ @_destroy_job = true
+ destroy!
+ ensure
+ @_destroy_job = false
+ end
+
private
def active_job_data
serialized_params.deep_dup
.tap do |job_data|
@@ -377,10 +387,10 @@
handled_error = value
value = nil
end
handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard
- ExecutionResult.new(value: value, handled_error: handled_error)
+ ExecutionResult.new(value: value, handled_error: handled_error, retried: current_thread.error_on_retry.present?)
rescue StandardError => e
ExecutionResult.new(value: nil, unhandled_error: e)
end
end
end