app/models/good_job/execution.rb in good_job-3.28.2 vs app/models/good_job/execution.rb in good_job-3.29.3

- old
+ new

@@ -38,11 +38,11 @@ # queue names should match, and dequeue should respect queue order. # @example # GoodJob::Execution.queue_parser('-queue1,queue2') # => { exclude: [ 'queue1', 'queue2' ] } def self.queue_parser(string) - string = string.presence || '*' + string = string.strip.presence || '*' case string.first when '-' exclude_queues = true string = string[1..] @@ -260,19 +260,19 @@ # @return [ExecutionResult, nil] # If a job was executed, returns an array with the {Execution} record, the # return value for the job's +#perform+ method, and the exception the job # raised, if any (if the job raised, then the second array entry will be # +nil+). If there were no jobs to execute, returns +nil+. - def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil) + def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) execution = nil result = nil unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |executions| execution = executions.first if execution&.executable? yield(execution) if block_given? - result = execution.perform + result = execution.perform(lock_id: lock_id) else execution = nil yield(nil) if block_given? end end @@ -365,11 +365,11 @@ # Execute the ActiveJob job this {Execution} represents. # @return [ExecutionResult] # An array of the return value of the job's +#perform+ method and the # exception raised by the job, if any. If the job completed successfully, # the second array entry (the exception) will be +nil+ and vice versa. - def perform + def perform(lock_id:) run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at job_performed_at = Time.current discrete_execution = nil @@ -395,21 +395,41 @@ end end if discrete? transaction do - discrete_execution = discrete_executions.create!( + discrete_execution_attrs = { job_class: job_class, queue_name: queue_name, serialized_params: serialized_params, scheduled_at: (scheduled_at || created_at), - created_at: job_performed_at - ) - update!(performed_at: job_performed_at, executions_count: ((executions_count || 0) + 1)) + created_at: job_performed_at, + } + discrete_execution_attrs[:process_id] = lock_id if GoodJob::DiscreteExecution.columns_hash.key?("process_id") + + execution_attrs = { + performed_at: job_performed_at, + executions_count: ((executions_count || 0) + 1), + } + if GoodJob::Execution.columns_hash.key?("locked_by_id") + execution_attrs[:locked_by_id] = lock_id + execution_attrs[:locked_at] = Time.current + end + + discrete_execution = discrete_executions.create!(discrete_execution_attrs) + update!(execution_attrs) end else - update!(performed_at: job_performed_at) + execution_attrs = { + performed_at: job_performed_at, + } + if GoodJob::Execution.columns_hash.key?("locked_by_id") + execution_attrs[:locked_by_id] = lock_id + execution_attrs[:locked_at] = Time.current + end + + update!(execution_attrs) end ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| value = ActiveJob::Base.execute(active_job_data) @@ -448,10 +468,14 @@ instrument_payload[:unhandled_error] = e ExecutionResult.new(value: nil, unhandled_error: e, error_event: error_event) end end - job_attributes = {} + job_attributes = if self.class.columns_hash.key?("locked_by_id") + { locked_by_id: nil, locked_at: nil } + else + {} + end job_error = result.handled_error || result.unhandled_error if job_error error_string = self.class.format_error(job_error)