app/models/good_job/execution.rb in good_job-3.28.2 vs app/models/good_job/execution.rb in good_job-3.29.3
- old
+ new
@@ -38,11 +38,11 @@
# queue names should match, and dequeue should respect queue order.
# @example
# GoodJob::Execution.queue_parser('-queue1,queue2')
# => { exclude: [ 'queue1', 'queue2' ] }
def self.queue_parser(string)
- string = string.presence || '*'
+ string = string.strip.presence || '*'
case string.first
when '-'
exclude_queues = true
string = string[1..]
@@ -260,19 +260,19 @@
# @return [ExecutionResult, nil]
# If a job was executed, returns an array with the {Execution} record, the
# return value for the job's +#perform+ method, and the exception the job
# raised, if any (if the job raised, then the second array entry will be
# +nil+). If there were no jobs to execute, returns +nil+.
- def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil)
+ def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil)
execution = nil
result = nil
unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |executions|
execution = executions.first
if execution&.executable?
yield(execution) if block_given?
- result = execution.perform
+ result = execution.perform(lock_id: lock_id)
else
execution = nil
yield(nil) if block_given?
end
end
@@ -365,11 +365,11 @@
# Execute the ActiveJob job this {Execution} represents.
# @return [ExecutionResult]
# An array of the return value of the job's +#perform+ method and the
# exception raised by the job, if any. If the job completed successfully,
# the second array entry (the exception) will be +nil+ and vice versa.
- def perform
+ def perform(lock_id:)
run_callbacks(:perform) do
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at
job_performed_at = Time.current
discrete_execution = nil
@@ -395,21 +395,41 @@
end
end
if discrete?
transaction do
- discrete_execution = discrete_executions.create!(
+ discrete_execution_attrs = {
job_class: job_class,
queue_name: queue_name,
serialized_params: serialized_params,
scheduled_at: (scheduled_at || created_at),
- created_at: job_performed_at
- )
- update!(performed_at: job_performed_at, executions_count: ((executions_count || 0) + 1))
+ created_at: job_performed_at,
+ }
+ discrete_execution_attrs[:process_id] = lock_id if GoodJob::DiscreteExecution.columns_hash.key?("process_id")
+
+ execution_attrs = {
+ performed_at: job_performed_at,
+ executions_count: ((executions_count || 0) + 1),
+ }
+ if GoodJob::Execution.columns_hash.key?("locked_by_id")
+ execution_attrs[:locked_by_id] = lock_id
+ execution_attrs[:locked_at] = Time.current
+ end
+
+ discrete_execution = discrete_executions.create!(discrete_execution_attrs)
+ update!(execution_attrs)
end
else
- update!(performed_at: job_performed_at)
+ execution_attrs = {
+ performed_at: job_performed_at,
+ }
+ if GoodJob::Execution.columns_hash.key?("locked_by_id")
+ execution_attrs[:locked_by_id] = lock_id
+ execution_attrs[:locked_at] = Time.current
+ end
+
+ update!(execution_attrs)
end
ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
value = ActiveJob::Base.execute(active_job_data)
@@ -448,10 +468,14 @@
instrument_payload[:unhandled_error] = e
ExecutionResult.new(value: nil, unhandled_error: e, error_event: error_event)
end
end
- job_attributes = {}
+ job_attributes = if self.class.columns_hash.key?("locked_by_id")
+ { locked_by_id: nil, locked_at: nil }
+ else
+ {}
+ end
job_error = result.handled_error || result.unhandled_error
if job_error
error_string = self.class.format_error(job_error)