app/models/good_job/execution.rb in good_job-3.15.3 vs app/models/good_job/execution.rb in good_job-3.15.4

- old
+ new

@@ -68,14 +68,18 @@ { include: queues } end end belongs_to :batch, class_name: 'GoodJob::BatchRecord', optional: true, inverse_of: :executions - belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions - after_destroy -> { self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job } + has_many :discrete_executions, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :execution # rubocop:disable Rails/HasManyOrHasOneDependent + after_destroy lambda { + GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4 + self.class.active_job_id(active_job_id).delete_all + }, if: -> { @_destroy_job } + # Get executions with given ActiveJob ID # @!method active_job_id # @!scope class # @param active_job_id [String] # ActiveJob ID @@ -199,12 +203,16 @@ elsif parsed[:include] where(queue_name: parsed[:include]) end end) - # Construct a GoodJob::Execution from an ActiveJob instance. def self.build_for_enqueue(active_job, overrides = {}) + new(**enqueue_args(active_job, overrides)) + end + + # Construct arguments for GoodJob::Execution from an ActiveJob instance. + def self.enqueue_args(active_job, overrides = {}) if active_job.priority && GoodJob.configuration.smaller_number_is_higher_priority.nil? ActiveSupport::Deprecation.warn(<<~DEPRECATION) The next major version of GoodJob (v4.0) will change job `priority` to give smaller numbers higher priority (default: `0`), in accordance with Active Job's definition of priority. To opt-in to this behavior now, set `config.good_job.smaller_number_is_higher_priority = true` in your GoodJob initializer or application.rb. To not opt-in yet, but silence this deprecation warning, set `config.good_job.smaller_number_is_higher_priority = false`. @@ -216,10 +224,11 @@ queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, priority: active_job.priority || DEFAULT_PRIORITY, serialized_params: active_job.serialize, scheduled_at: active_job.scheduled_at, } + execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id current_execution = CurrentThread.execution @@ -236,11 +245,11 @@ end execution_args[:cron_key] = CurrentThread.cron_key execution_args[:cron_at] = CurrentThread.cron_at end - new(**execution_args.merge(overrides)) + execution_args.merge(overrides) end # Finds the next eligible Execution, acquire an advisory lock related to it, and # executes the job. # @return [ExecutionResult, nil] @@ -296,39 +305,94 @@ # jobs the caller takes care of the persistence and sets this parameter to `false` # @return [Execution] # The new {Execution} instance representing the queued ActiveJob job. def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| - execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) + current_execution = CurrentThread.execution - execution.create_with_advisory_lock = create_with_advisory_lock - instrument_payload[:execution] = execution + retried = current_execution && current_execution.active_job_id == active_job.job_id + if retried + if current_execution.discrete? + execution = current_execution + execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at })) + execution.scheduled_at ||= Time.current + execution.performed_at = nil + execution.finished_at = nil + else + execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) + end + else + execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) + execution.make_discrete if discrete_support? + end + if create_with_advisory_lock + if execution.persisted? + execution.advisory_lock + else + execution.create_with_advisory_lock = true + end + end + + instrument_payload[:execution] = execution execution.save! - active_job.provider_job_id = execution.id - CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id + CurrentThread.execution.retried_good_job_id = execution.id if retried && !CurrentThread.execution.discrete? + active_job.provider_job_id = execution.id execution end end + def self.format_error(error) + raise ArgumentError unless error.is_a?(Exception) + + [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join + end + # Execute the ActiveJob job this {Execution} represents. # @return [ExecutionResult] # An array of the return value of the job's +#perform+ method and the # exception raised by the job, if any. If the job completed successfully, # the second array entry (the exception) will be +nil+ and vice versa. def perform run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at + discrete_execution = nil result = GoodJob::CurrentThread.within do |current_thread| current_thread.reset current_thread.execution = self - current_thread.execution_interrupted = performed_at if performed_at - update!(performed_at: Time.current) + if performed_at + current_thread.execution_interrupted = performed_at + if discrete? + interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{performed_at}'")) + self.error = interrupt_error_string + discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all( # rubocop:disable Rails/SkipsModelValidations + error: interrupt_error_string, + finished_at: Time.current + ) + end + end + + if discrete? + transaction do + now = Time.current + discrete_execution = discrete_executions.create!( + job_class: job_class, + queue_name: queue_name, + serialized_params: serialized_params, + scheduled_at: (scheduled_at || created_at), + created_at: now + ) + update!(performed_at: now, executions_count: ((executions_count || 0) + 1)) + end + else + update!(performed_at: Time.current) + end + ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| value = ActiveJob::Base.execute(active_job_data) if value.is_a?(Exception) handled_error = value @@ -347,18 +411,46 @@ ExecutionResult.new(value: nil, unhandled_error: e) end end job_error = result.handled_error || result.unhandled_error - self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error + if job_error + error_string = self.class.format_error(job_error) + self.error = error_string + discrete_execution.error = error_string if discrete_execution + else + self.error = nil + end + reenqueued = result.retried? || retried_good_job_id.present? if result.unhandled_error && GoodJob.retry_on_unhandled_error - save! + if discrete_execution + transaction do + discrete_execution.update!(finished_at: Time.current) + update!(performed_at: nil, finished_at: nil, retried_good_job_id: nil) + end + else + save! + end elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present? - self.finished_at = Time.current - save! + now = Time.current + if discrete_execution + if reenqueued + self.performed_at = nil + else + self.finished_at = now + end + discrete_execution.finished_at = now + transaction do + discrete_execution.save! + save! + end + else + self.finished_at = now + save! + end else destroy_job end result @@ -367,9 +459,20 @@ # Tests whether this job is safe to be executed by this thread. # @return [Boolean] def executable? self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id) + end + + def make_discrete + self.is_discrete = true + self.id = active_job_id + self.job_class = serialized_params['job_class'] + self.executions_count ||= 0 + + current_time = Time.current + self.created_at ||= current_time + self.scheduled_at ||= current_time end # Build an ActiveJob instance and deserialize the arguments, using `#active_job_data`. # # @param ignore_deserialization_errors [Boolean]