app/models/good_job/execution.rb in good_job-3.24.0 vs app/models/good_job/execution.rb in good_job-3.25.0
- old
+ new
@@ -341,12 +341,14 @@
instrument_payload[:execution] = execution
execution.save!
if retried
- CurrentThread.execution_retried = true
+ CurrentThread.execution_retried = execution
CurrentThread.execution.retried_good_job_id = execution.id unless current_execution.discrete?
+ else
+ CurrentThread.execution_retried = nil
end
active_job.provider_job_id = execution.id
execution
end
@@ -365,46 +367,47 @@
# the second array entry (the exception) will be +nil+ and vice versa.
def perform
run_callbacks(:perform) do
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at
+ job_performed_at = Time.current
discrete_execution = nil
result = GoodJob::CurrentThread.within do |current_thread|
current_thread.reset
current_thread.execution = self
- if performed_at
- current_thread.execution_interrupted = performed_at
+ existing_performed_at = performed_at
+ if existing_performed_at
+ current_thread.execution_interrupted = existing_performed_at
if discrete?
- interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{performed_at}'"))
+ interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{existing_performed_at}'"))
self.error = interrupt_error_string
self.error_event = ERROR_EVENT_INTERRUPTED if self.class.error_event_migrated?
discrete_execution_attrs = {
error: interrupt_error_string,
- finished_at: Time.current,
+ finished_at: job_performed_at,
}
discrete_execution_attrs[:error_event] = GoodJob::ErrorEvents::ERROR_EVENT_ENUMS[GoodJob::ErrorEvents::ERROR_EVENT_INTERRUPTED] if self.class.error_event_migrated?
discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all(discrete_execution_attrs) # rubocop:disable Rails/SkipsModelValidations
end
end
if discrete?
transaction do
- now = Time.current
discrete_execution = discrete_executions.create!(
job_class: job_class,
queue_name: queue_name,
serialized_params: serialized_params,
scheduled_at: (scheduled_at || created_at),
- created_at: now
+ created_at: job_performed_at
)
- update!(performed_at: now, executions_count: ((executions_count || 0) + 1))
+ update!(performed_at: job_performed_at, executions_count: ((executions_count || 0) + 1))
end
else
- update!(performed_at: Time.current)
+ update!(performed_at: job_performed_at)
end
ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
value = ActiveJob::Base.execute(active_job_data)
@@ -425,11 +428,11 @@
end
instrument_payload.merge!(
value: value,
handled_error: handled_error,
- retried: current_thread.execution_retried,
+ retried: current_thread.execution_retried.present?,
error_event: error_event
)
ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried: current_thread.execution_retried)
rescue StandardError => e
error_event = if e.is_a?(GoodJob::InterruptError)
@@ -443,51 +446,53 @@
instrument_payload[:unhandled_error] = e
ExecutionResult.new(value: nil, unhandled_error: e, error_event: error_event)
end
end
- job_error = result.handled_error || result.unhandled_error
+ job_attributes = {}
+ job_error = result.handled_error || result.unhandled_error
if job_error
error_string = self.class.format_error(job_error)
- self.error = error_string
- self.error_event = result.error_event if self.class.error_event_migrated?
+
+ job_attributes[:error] = error_string
+ job_attributes[:error_event] = result.error_event if self.class.error_event_migrated?
if discrete_execution
discrete_execution.error = error_string
- discrete_execution.error_event = result.error_event if discrete_execution.class.error_event_migrated?
+ discrete_execution.error_event = result.error_event
end
else
- self.error = nil
- self.error_event = nil if self.class.error_event_migrated?
+ job_attributes[:error] = nil
+ job_attributes[:error_event] = nil
end
+ job_attributes.delete(:error_event) unless self.class.error_event_migrated?
- reenqueued = result.retried? || retried_good_job_id.present?
- if result.unhandled_error && GoodJob.retry_on_unhandled_error
+ job_finished_at = Time.current
+ job_attributes[:finished_at] = job_finished_at
+ discrete_execution.finished_at = job_finished_at if discrete_execution
+
+ retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error
+ reenqueued = result.retried? || retried_good_job_id.present? || retry_unhandled_error
+ if reenqueued
if discrete_execution
- transaction do
- discrete_execution.update!(finished_at: Time.current)
- update!(performed_at: nil, finished_at: nil, retried_good_job_id: nil)
- end
+ job_attributes[:performed_at] = nil
+ job_attributes[:finished_at] = nil
else
- save!
+ job_attributes[:retried_good_job_id] = retried_good_job_id
+ job_attributes[:finished_at] = nil if retry_unhandled_error
end
- elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present?
- now = Time.current
+ end
+
+ preserve_unhandled = (result.unhandled_error && (GoodJob.retry_on_unhandled_error || GoodJob.preserve_job_records == :on_unhandled_error))
+ if GoodJob.preserve_job_records == true || reenqueued || preserve_unhandled || cron_key.present?
if discrete_execution
- if reenqueued
- self.performed_at = nil
- else
- self.finished_at = now
- end
- discrete_execution.finished_at = now
transaction do
discrete_execution.save!
- save!
+ update!(job_attributes)
end
else
- self.finished_at = now
- save!
+ update!(job_attributes)
end
else
destroy_job
end
@@ -552,9 +557,15 @@
def destroy_job
@_destroy_job = true
destroy!
ensure
@_destroy_job = false
+ end
+
+ def job_state
+ state = { queue_name: queue_name }
+ state[:scheduled_at] = scheduled_at if scheduled_at
+ state
end
private
def reset_batch_values(&block)