app/models/good_job/execution.rb in good_job-3.9.0 vs app/models/good_job/execution.rb in good_job-3.10.0

- old
+ new

@@ -18,12 +18,16 @@ DEFAULT_PRIORITY = 0 self.table_name = 'good_jobs' self.advisory_lockable_column = 'active_job_id' + define_model_callbacks :perform define_model_callbacks :perform_unlocked, only: :after + set_callback :perform, :around, :reset_batch_values + set_callback :perform_unlocked, :after, :continue_discard_or_finish_batch + # Parse a string representing a group of queues into a more readable data # structure. # @param string [String] Queue string # @return [Hash] # How to match a given queue. It can have the following keys and values: @@ -63,10 +67,13 @@ else { include: queues } end end + belongs_to :batch, class_name: 'GoodJob::BatchRecord', optional: true, inverse_of: :executions + belongs_to :batch_callback, class_name: 'GoodJob::Batch', optional: true + belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions after_destroy -> { self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job } # Get executions with given ActiveJob ID # @!method active_job_id @@ -204,18 +211,28 @@ queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, priority: active_job.priority || DEFAULT_PRIORITY, serialized_params: active_job.serialize, scheduled_at: active_job.scheduled_at, } - execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) - if CurrentThread.cron_key + reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id + current_execution = CurrentThread.execution + + if reenqueued_current_execution + if GoodJob::BatchRecord.migrated? + execution_args[:batch_id] = current_execution.batch_id + execution_args[:batch_callback_id] = current_execution.batch_callback_id + end + execution_args[:cron_key] = current_execution.cron_key + else + if GoodJob::BatchRecord.migrated? + execution_args[:batch_id] = GoodJob::Batch.current_batch_id + execution_args[:batch_callback_id] = GoodJob::Batch.current_batch_callback_id + end execution_args[:cron_key] = CurrentThread.cron_key execution_args[:cron_at] = CurrentThread.cron_at - elsif CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id - execution_args[:cron_key] = CurrentThread.execution.cron_key end new(**execution_args.merge(overrides)) end @@ -281,11 +298,10 @@ execution.create_with_advisory_lock = create_with_advisory_lock instrument_payload[:execution] = execution execution.save! active_job.provider_job_id = execution.id - CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id execution end end @@ -294,31 +310,33 @@ # @return [ExecutionResult] # An array of the return value of the job's +#perform+ method and the # exception raised by the job, if any. If the job completed successfully, # the second array entry (the exception) will be +nil+ and vice versa. def perform - raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at + run_callbacks(:perform) do + raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at - self.performed_at = Time.current - save! if GoodJob.preserve_job_records + self.performed_at = Time.current + save! if GoodJob.preserve_job_records - result = execute + result = execute - job_error = result.handled_error || result.unhandled_error - self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error + job_error = result.handled_error || result.unhandled_error + self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error - reenqueued = result.retried? || retried_good_job_id.present? - if result.unhandled_error && GoodJob.retry_on_unhandled_error - save! - elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present? - self.finished_at = Time.current - save! - else - destroy_job - end + reenqueued = result.retried? || retried_good_job_id.present? + if result.unhandled_error && GoodJob.retry_on_unhandled_error + save! + elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present? + self.finished_at = Time.current + save! + else + destroy_job + end - result + result + end end # Tests whether this job is safe to be executed by this thread. # @return [Boolean] def executable? @@ -408,8 +426,16 @@ ExecutionResult.new(value: value, handled_error: handled_error, retried: current_thread.error_on_retry.present?) rescue StandardError => e ExecutionResult.new(value: nil, unhandled_error: e) end end + end + + def reset_batch_values(&block) + GoodJob::Batch.within_thread(batch_id: nil, batch_callback_id: nil, &block) + end + + def continue_discard_or_finish_batch + batch._continue_discard_or_finish(self) if GoodJob::BatchRecord.migrated? && batch.present? end end end