lib/good_job/job.rb in good_job-1.9.1 vs lib/good_job/job.rb in good_job-1.9.2

- old
+ new

@@ -1,9 +1,10 @@ module GoodJob - # - # Represents a request to perform an +ActiveJob+ job. - # + # ActiveRecord model that represents an +ActiveJob+ job. + # Parent class can be configured with +GoodJob.active_record_parent_class+. + # @!parse + # class Job < ActiveRecord::Base; end class Job < Object.const_get(GoodJob.active_record_parent_class) include Lockable # Raised if something attempts to execute a previously completed Job again. PreviouslyPerformedError = Class.new(StandardError) @@ -17,10 +18,11 @@ attr_readonly :serialized_params # Parse a string representing a group of queues into a more readable data # structure. + # @param string [String] Queue string # @return [Hash] # How to match a given queue. It can have the following keys and values: # - +{ all: true }+ indicates that all queues match. # - +{ exclude: Array<String> }+ indicates the listed queue names should # not match. @@ -132,33 +134,30 @@ query end) # Finds the next eligible Job, acquire an advisory lock related to it, and # executes the job. - # @return [Array<(GoodJob::Job, Object, Exception)>, nil] + # @return [ExecutionResult, nil] # If a job was executed, returns an array with the {Job} record, the # return value for the job's +#perform+ method, and the exception the job # raised, if any (if the job raised, then the second array entry will be # +nil+). If there were no jobs to execute, returns +nil+. def self.perform_with_advisory_lock - good_job = nil - result = nil - error = nil - unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs| good_job = good_jobs.first # TODO: Determine why some records are fetched without an advisory lock at all break unless good_job&.executable? - result, error = good_job.perform + good_job.perform end - - [good_job, result, error] if good_job end # Fetches the scheduled execution time of the next eligible Job(s). - # @return [Array<(DateTime)>] + # @param after [DateTime] + # @param limit [Integer] + # @param now_limit [Integer, nil] + # @return [Array<DateTime>] def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) query = advisory_unlocked.unfinished.schedule_ordered after ||= Time.current after_query = query.where('scheduled_at > ?', after).or query.where(scheduled_at: nil).where('created_at > ?', after) @@ -180,11 +179,10 @@ # @param create_with_advisory_lock [Boolean] # Whether to establish a lock on the {Job} record after it is created. # @return [Job] # The new {Job} instance representing the queued ActiveJob job. def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) - good_job = nil ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| good_job = GoodJob::Job.new( queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, priority: active_job.priority || DEFAULT_PRIORITY, serialized_params: active_job.serialize, @@ -194,71 +192,69 @@ instrument_payload[:good_job] = good_job good_job.save! active_job.provider_job_id = good_job.id - end - good_job + good_job + end end # Execute the ActiveJob job this {Job} represents. - # @return [Array<(Object, Exception)>] + # @return [ExecutionResult] # An array of the return value of the job's +#perform+ method and the # exception raised by the job, if any. If the job completed successfully, # the second array entry (the exception) will be +nil+ and vice versa. def perform raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at - GoodJob::CurrentExecution.reset - self.performed_at = Time.current save! if GoodJob.preserve_job_records - result, unhandled_error = execute + result = execute - result_error = nil - if result.is_a?(Exception) - result_error = result - result = nil - end - - job_error = unhandled_error || - result_error || - GoodJob::CurrentExecution.error_on_retry || - GoodJob::CurrentExecution.error_on_discard - + job_error = result.handled_error || result.unhandled_error self.error = "#{job_error.class}: #{job_error.message}" if job_error - if unhandled_error && GoodJob.retry_on_unhandled_error + if result.unhandled_error && GoodJob.retry_on_unhandled_error save! - elsif GoodJob.preserve_job_records == true || (unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) + elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) self.finished_at = Time.current save! else destroy! end - [result, job_error] + result end # Tests whether this job is safe to be executed by this thread. # @return [Boolean] def executable? self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id) end private + # @return [GoodJob::ExecutionResult] def execute params = serialized_params.merge( "provider_job_id" => id ) + GoodJob::CurrentExecution.reset ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do - [ActiveJob::Base.execute(params), nil] + value = ActiveJob::Base.execute(params) + + if value.is_a?(Exception) + handled_error = value + value = nil + end + handled_error ||= GoodJob::CurrentExecution.error_on_retry || GoodJob::CurrentExecution.error_on_discard + + ExecutionResult.new(value: value, handled_error: handled_error) + rescue StandardError => e + ExecutionResult.new(value: nil, unhandled_error: e) end - rescue StandardError => e - [nil, e] end end end