app/models/good_job/execution.rb in good_job-3.9.0 vs app/models/good_job/execution.rb in good_job-3.10.0
- old
+ new
@@ -18,12 +18,16 @@
DEFAULT_PRIORITY = 0
self.table_name = 'good_jobs'
self.advisory_lockable_column = 'active_job_id'
+ define_model_callbacks :perform
define_model_callbacks :perform_unlocked, only: :after
+ set_callback :perform, :around, :reset_batch_values
+ set_callback :perform_unlocked, :after, :continue_discard_or_finish_batch
+
# Parse a string representing a group of queues into a more readable data
# structure.
# @param string [String] Queue string
# @return [Hash]
# How to match a given queue. It can have the following keys and values:
@@ -63,10 +67,13 @@
else
{ include: queues }
end
end
+ belongs_to :batch, class_name: 'GoodJob::BatchRecord', optional: true, inverse_of: :executions
+ belongs_to :batch_callback, class_name: 'GoodJob::Batch', optional: true
+
belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions
after_destroy -> { self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job }
# Get executions with given ActiveJob ID
# @!method active_job_id
@@ -204,18 +211,28 @@
queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
priority: active_job.priority || DEFAULT_PRIORITY,
serialized_params: active_job.serialize,
scheduled_at: active_job.scheduled_at,
}
-
execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)
- if CurrentThread.cron_key
+ reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
+ current_execution = CurrentThread.execution
+
+ if reenqueued_current_execution
+ if GoodJob::BatchRecord.migrated?
+ execution_args[:batch_id] = current_execution.batch_id
+ execution_args[:batch_callback_id] = current_execution.batch_callback_id
+ end
+ execution_args[:cron_key] = current_execution.cron_key
+ else
+ if GoodJob::BatchRecord.migrated?
+ execution_args[:batch_id] = GoodJob::Batch.current_batch_id
+ execution_args[:batch_callback_id] = GoodJob::Batch.current_batch_callback_id
+ end
execution_args[:cron_key] = CurrentThread.cron_key
execution_args[:cron_at] = CurrentThread.cron_at
- elsif CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
- execution_args[:cron_key] = CurrentThread.execution.cron_key
end
new(**execution_args.merge(overrides))
end
@@ -281,11 +298,10 @@
execution.create_with_advisory_lock = create_with_advisory_lock
instrument_payload[:execution] = execution
execution.save!
active_job.provider_job_id = execution.id
-
CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
execution
end
end
@@ -294,31 +310,33 @@
# @return [ExecutionResult]
# An array of the return value of the job's +#perform+ method and the
# exception raised by the job, if any. If the job completed successfully,
# the second array entry (the exception) will be +nil+ and vice versa.
def perform
- raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at
+ run_callbacks(:perform) do
+ raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at
- self.performed_at = Time.current
- save! if GoodJob.preserve_job_records
+ self.performed_at = Time.current
+ save! if GoodJob.preserve_job_records
- result = execute
+ result = execute
- job_error = result.handled_error || result.unhandled_error
- self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error
+ job_error = result.handled_error || result.unhandled_error
+ self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error
- reenqueued = result.retried? || retried_good_job_id.present?
- if result.unhandled_error && GoodJob.retry_on_unhandled_error
- save!
- elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present?
- self.finished_at = Time.current
- save!
- else
- destroy_job
- end
+ reenqueued = result.retried? || retried_good_job_id.present?
+ if result.unhandled_error && GoodJob.retry_on_unhandled_error
+ save!
+ elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present?
+ self.finished_at = Time.current
+ save!
+ else
+ destroy_job
+ end
- result
+ result
+ end
end
# Tests whether this job is safe to be executed by this thread.
# @return [Boolean]
def executable?
@@ -408,8 +426,16 @@
ExecutionResult.new(value: value, handled_error: handled_error, retried: current_thread.error_on_retry.present?)
rescue StandardError => e
ExecutionResult.new(value: nil, unhandled_error: e)
end
end
+ end
+
+ def reset_batch_values(&block)
+ GoodJob::Batch.within_thread(batch_id: nil, batch_callback_id: nil, &block)
+ end
+
+ def continue_discard_or_finish_batch
+ batch._continue_discard_or_finish(self) if GoodJob::BatchRecord.migrated? && batch.present?
end
end
end