app/models/good_job/execution.rb in good_job-3.15.3 vs app/models/good_job/execution.rb in good_job-3.15.4
- old
+ new
@@ -68,14 +68,18 @@
{ include: queues }
end
end
belongs_to :batch, class_name: 'GoodJob::BatchRecord', optional: true, inverse_of: :executions
-
belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions
- after_destroy -> { self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job }
+ has_many :discrete_executions, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :execution # rubocop:disable Rails/HasManyOrHasOneDependent
+ after_destroy lambda {
+ GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4
+ self.class.active_job_id(active_job_id).delete_all
+ }, if: -> { @_destroy_job }
+
# Get executions with given ActiveJob ID
# @!method active_job_id
# @!scope class
# @param active_job_id [String]
# ActiveJob ID
@@ -199,12 +203,16 @@
elsif parsed[:include]
where(queue_name: parsed[:include])
end
end)
- # Construct a GoodJob::Execution from an ActiveJob instance.
def self.build_for_enqueue(active_job, overrides = {})
+ new(**enqueue_args(active_job, overrides))
+ end
+
+ # Construct arguments for GoodJob::Execution from an ActiveJob instance.
+ def self.enqueue_args(active_job, overrides = {})
if active_job.priority && GoodJob.configuration.smaller_number_is_higher_priority.nil?
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
The next major version of GoodJob (v4.0) will change job `priority` to give smaller numbers higher priority (default: `0`), in accordance with Active Job's definition of priority.
To opt-in to this behavior now, set `config.good_job.smaller_number_is_higher_priority = true` in your GoodJob initializer or application.rb.
To not opt-in yet, but silence this deprecation warning, set `config.good_job.smaller_number_is_higher_priority = false`.
@@ -216,10 +224,11 @@
queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
priority: active_job.priority || DEFAULT_PRIORITY,
serialized_params: active_job.serialize,
scheduled_at: active_job.scheduled_at,
}
+
execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)
reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
current_execution = CurrentThread.execution
@@ -236,11 +245,11 @@
end
execution_args[:cron_key] = CurrentThread.cron_key
execution_args[:cron_at] = CurrentThread.cron_at
end
- new(**execution_args.merge(overrides))
+ execution_args.merge(overrides)
end
# Finds the next eligible Execution, acquire an advisory lock related to it, and
# executes the job.
# @return [ExecutionResult, nil]
@@ -296,39 +305,94 @@
# jobs the caller takes care of the persistence and sets this parameter to `false`
# @return [Execution]
# The new {Execution} instance representing the queued ActiveJob job.
def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
- execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
+ current_execution = CurrentThread.execution
- execution.create_with_advisory_lock = create_with_advisory_lock
- instrument_payload[:execution] = execution
+ retried = current_execution && current_execution.active_job_id == active_job.job_id
+ if retried
+ if current_execution.discrete?
+ execution = current_execution
+ execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at }))
+ execution.scheduled_at ||= Time.current
+ execution.performed_at = nil
+ execution.finished_at = nil
+ else
+ execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
+ end
+ else
+ execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
+ execution.make_discrete if discrete_support?
+ end
+ if create_with_advisory_lock
+ if execution.persisted?
+ execution.advisory_lock
+ else
+ execution.create_with_advisory_lock = true
+ end
+ end
+
+ instrument_payload[:execution] = execution
execution.save!
- active_job.provider_job_id = execution.id
- CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
+ CurrentThread.execution.retried_good_job_id = execution.id if retried && !CurrentThread.execution.discrete?
+ active_job.provider_job_id = execution.id
execution
end
end
+ def self.format_error(error)
+ raise ArgumentError unless error.is_a?(Exception)
+
+ [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join
+ end
+
# Execute the ActiveJob job this {Execution} represents.
# @return [ExecutionResult]
# An array of the return value of the job's +#perform+ method and the
# exception raised by the job, if any. If the job completed successfully,
# the second array entry (the exception) will be +nil+ and vice versa.
def perform
run_callbacks(:perform) do
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at
+ discrete_execution = nil
result = GoodJob::CurrentThread.within do |current_thread|
current_thread.reset
current_thread.execution = self
- current_thread.execution_interrupted = performed_at if performed_at
- update!(performed_at: Time.current)
+ if performed_at
+ current_thread.execution_interrupted = performed_at
+ if discrete?
+ interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{performed_at}'"))
+ self.error = interrupt_error_string
+ discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all( # rubocop:disable Rails/SkipsModelValidations
+ error: interrupt_error_string,
+ finished_at: Time.current
+ )
+ end
+ end
+
+ if discrete?
+ transaction do
+ now = Time.current
+ discrete_execution = discrete_executions.create!(
+ job_class: job_class,
+ queue_name: queue_name,
+ serialized_params: serialized_params,
+ scheduled_at: (scheduled_at || created_at),
+ created_at: now
+ )
+ update!(performed_at: now, executions_count: ((executions_count || 0) + 1))
+ end
+ else
+ update!(performed_at: Time.current)
+ end
+
ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
value = ActiveJob::Base.execute(active_job_data)
if value.is_a?(Exception)
handled_error = value
@@ -347,18 +411,46 @@
ExecutionResult.new(value: nil, unhandled_error: e)
end
end
job_error = result.handled_error || result.unhandled_error
- self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error
+ if job_error
+ error_string = self.class.format_error(job_error)
+ self.error = error_string
+ discrete_execution.error = error_string if discrete_execution
+ else
+ self.error = nil
+ end
+
reenqueued = result.retried? || retried_good_job_id.present?
if result.unhandled_error && GoodJob.retry_on_unhandled_error
- save!
+ if discrete_execution
+ transaction do
+ discrete_execution.update!(finished_at: Time.current)
+ update!(performed_at: nil, finished_at: nil, retried_good_job_id: nil)
+ end
+ else
+ save!
+ end
elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present?
- self.finished_at = Time.current
- save!
+ now = Time.current
+ if discrete_execution
+ if reenqueued
+ self.performed_at = nil
+ else
+ self.finished_at = now
+ end
+ discrete_execution.finished_at = now
+ transaction do
+ discrete_execution.save!
+ save!
+ end
+ else
+ self.finished_at = now
+ save!
+ end
else
destroy_job
end
result
@@ -367,9 +459,20 @@
# Tests whether this job is safe to be executed by this thread.
# @return [Boolean]
def executable?
self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
+ end
+
+ def make_discrete
+ self.is_discrete = true
+ self.id = active_job_id
+ self.job_class = serialized_params['job_class']
+ self.executions_count ||= 0
+
+ current_time = Time.current
+ self.created_at ||= current_time
+ self.scheduled_at ||= current_time
end
# Build an ActiveJob instance and deserialize the arguments, using `#active_job_data`.
#
# @param ignore_deserialization_errors [Boolean]