app/models/good_job/execution.rb in good_job-3.24.0 vs app/models/good_job/execution.rb in good_job-3.25.0

- old
+ new

@@ -341,12 +341,14 @@ instrument_payload[:execution] = execution execution.save! if retried - CurrentThread.execution_retried = true + CurrentThread.execution_retried = execution CurrentThread.execution.retried_good_job_id = execution.id unless current_execution.discrete? + else + CurrentThread.execution_retried = nil end active_job.provider_job_id = execution.id execution end @@ -365,46 +367,47 @@ # the second array entry (the exception) will be +nil+ and vice versa. def perform run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at + job_performed_at = Time.current discrete_execution = nil result = GoodJob::CurrentThread.within do |current_thread| current_thread.reset current_thread.execution = self - if performed_at - current_thread.execution_interrupted = performed_at + existing_performed_at = performed_at + if existing_performed_at + current_thread.execution_interrupted = existing_performed_at if discrete? - interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{performed_at}'")) + interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{existing_performed_at}'")) self.error = interrupt_error_string self.error_event = ERROR_EVENT_INTERRUPTED if self.class.error_event_migrated? discrete_execution_attrs = { error: interrupt_error_string, - finished_at: Time.current, + finished_at: job_performed_at, } discrete_execution_attrs[:error_event] = GoodJob::ErrorEvents::ERROR_EVENT_ENUMS[GoodJob::ErrorEvents::ERROR_EVENT_INTERRUPTED] if self.class.error_event_migrated? discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all(discrete_execution_attrs) # rubocop:disable Rails/SkipsModelValidations end end if discrete? transaction do - now = Time.current discrete_execution = discrete_executions.create!( job_class: job_class, queue_name: queue_name, serialized_params: serialized_params, scheduled_at: (scheduled_at || created_at), - created_at: now + created_at: job_performed_at ) - update!(performed_at: now, executions_count: ((executions_count || 0) + 1)) + update!(performed_at: job_performed_at, executions_count: ((executions_count || 0) + 1)) end else - update!(performed_at: Time.current) + update!(performed_at: job_performed_at) end ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| value = ActiveJob::Base.execute(active_job_data) @@ -425,11 +428,11 @@ end instrument_payload.merge!( value: value, handled_error: handled_error, - retried: current_thread.execution_retried, + retried: current_thread.execution_retried.present?, error_event: error_event ) ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried: current_thread.execution_retried) rescue StandardError => e error_event = if e.is_a?(GoodJob::InterruptError) @@ -443,51 +446,53 @@ instrument_payload[:unhandled_error] = e ExecutionResult.new(value: nil, unhandled_error: e, error_event: error_event) end end - job_error = result.handled_error || result.unhandled_error + job_attributes = {} + job_error = result.handled_error || result.unhandled_error if job_error error_string = self.class.format_error(job_error) - self.error = error_string - self.error_event = result.error_event if self.class.error_event_migrated? + + job_attributes[:error] = error_string + job_attributes[:error_event] = result.error_event if self.class.error_event_migrated? if discrete_execution discrete_execution.error = error_string - discrete_execution.error_event = result.error_event if discrete_execution.class.error_event_migrated? + discrete_execution.error_event = result.error_event end else - self.error = nil - self.error_event = nil if self.class.error_event_migrated? + job_attributes[:error] = nil + job_attributes[:error_event] = nil end + job_attributes.delete(:error_event) unless self.class.error_event_migrated? - reenqueued = result.retried? || retried_good_job_id.present? - if result.unhandled_error && GoodJob.retry_on_unhandled_error + job_finished_at = Time.current + job_attributes[:finished_at] = job_finished_at + discrete_execution.finished_at = job_finished_at if discrete_execution + + retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error + reenqueued = result.retried? || retried_good_job_id.present? || retry_unhandled_error + if reenqueued if discrete_execution - transaction do - discrete_execution.update!(finished_at: Time.current) - update!(performed_at: nil, finished_at: nil, retried_good_job_id: nil) - end + job_attributes[:performed_at] = nil + job_attributes[:finished_at] = nil else - save! + job_attributes[:retried_good_job_id] = retried_good_job_id + job_attributes[:finished_at] = nil if retry_unhandled_error end - elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present? - now = Time.current + end + + preserve_unhandled = (result.unhandled_error && (GoodJob.retry_on_unhandled_error || GoodJob.preserve_job_records == :on_unhandled_error)) + if GoodJob.preserve_job_records == true || reenqueued || preserve_unhandled || cron_key.present? if discrete_execution - if reenqueued - self.performed_at = nil - else - self.finished_at = now - end - discrete_execution.finished_at = now transaction do discrete_execution.save! - save! + update!(job_attributes) end else - self.finished_at = now - save! + update!(job_attributes) end else destroy_job end @@ -552,9 +557,15 @@ def destroy_job @_destroy_job = true destroy! ensure @_destroy_job = false + end + + def job_state + state = { queue_name: queue_name } + state[:scheduled_at] = scheduled_at if scheduled_at + state end private def reset_batch_values(&block)