lib/good_job/job.rb in good_job-1.2.4 vs lib/good_job/job.rb in good_job-1.2.5

- old
+ new

@@ -1,16 +1,34 @@ module GoodJob + # + # Represents a request to perform an +ActiveJob+ job. + # class Job < ActiveRecord::Base include Lockable + # Raised if something attempts to execute a previously completed Job again. PreviouslyPerformedError = Class.new(StandardError) + # ActiveJob jobs without a +queue_name+ attribute are placed on this queue. DEFAULT_QUEUE_NAME = 'default'.freeze + # ActiveJob jobs without a +priority+ attribute are given this priority. DEFAULT_PRIORITY = 0 self.table_name = 'good_jobs'.freeze + # Parse a string representing a group of queues into a more readable data + # structure. + # @return [Hash] + # How to match a given queue. It can have the following keys and values: + # - +{ all: true }+ indicates that all queues match. + # - +{ exclude: Array<String> }+ indicates the listed queue names should + # not match. + # - +{ include: Array<String> }+ indicates the listed queue names should + # match. + # @example + # GoodJob::Job.queue_parser('-queue1,queue2') + # => { exclude: [ 'queue1', 'queue2' ] } def self.queue_parser(string) string = string.presence || '*' if string.first == '-' exclude_queues = true @@ -26,21 +44,58 @@ else { include: queues } end end + # Get Jobs that have not yet been completed. + # @!method unfinished + # @!scope class + # @return [ActiveRecord::Relation] scope :unfinished, (lambda do if column_names.include?('finished_at') where(finished_at: nil) else ActiveSupport::Deprecation.warn('GoodJob expects a good_jobs.finished_at column to exist. Please see the GoodJob README.md for migration instructions.') nil end end) + + # Get Jobs that are not scheduled for a later time than now (i.e. jobs that + # are not scheduled or scheduled for earlier than the current time). + # @!method only_scheduled + # @!scope class + # @return [ActiveRecord::Relation] scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(Time.current)).or(where(scheduled_at: nil)) } + + # Order jobs by priority (highest priority first). + # @!method priority_ordered + # @!scope class + # @return [ActiveRecord::Relation] scope :priority_ordered, -> { order('priority DESC NULLS LAST') } + + # Get Jobs were completed before the given timestamp. If no timestamp is + # provided, get all jobs that have been completed. By default, GoodJob + # deletes jobs after they are completed and this will find no jobs. + # However, if you have changed {GoodJob.preserve_job_records}, this may + # find completed Jobs. + # @!method finished(timestamp = nil) + # @!scope class + # @param timestamp (Float) + # Get jobs that finished before this time (in epoch time). + # @return [ActiveRecord::Relation] scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(timestamp)) : where.not(finished_at: nil) } + + # Get Jobs on queues that match the given queue string. + # @!method queue_string(string) + # @!scope class + # @param string [String] + # A string expression describing what queues to select. See + # {Job.queue_parser} or + # {file:README.md#optimize-queues-threads-and-processes} for more details + # on the format of the string. Note this only handles individual + # semicolon-separated segments of that string format. + # @return [ActiveRecord::Relation] scope :queue_string, (lambda do |string| parsed = queue_parser(string) if parsed[:all] all @@ -49,10 +104,35 @@ elsif parsed[:include] where(queue_name: parsed[:include]) end end) + # Get Jobs in display order with optional keyset pagination. + # @!method display_all(after_scheduled_at: nil, after_id: nil) + # @!scope class + # @param after_scheduled_at [DateTime, String, nil] + # Display records scheduled after this time for keyset pagination + # @param after_id [Numeric, String, nil] + # Display records after this ID for keyset pagination + # @return [ActiveRecord::Relation] + scope :display_all, (lambda do |after_scheduled_at: nil, after_id: nil| + query = order(Arel.sql('COALESCE(scheduled_at, created_at) DESC, id DESC')) + if after_scheduled_at.present? && after_id.present? + query = query.where(Arel.sql('(COALESCE(scheduled_at, created_at), id) < (:after_scheduled_at, :after_id)'), after_scheduled_at: after_scheduled_at, after_id: after_id) + elsif after_scheduled_at.present? + query = query.where(Arel.sql('(COALESCE(scheduled_at, created_at)) < (:after_scheduled_at)'), after_scheduled_at: after_scheduled_at) + end + query + end) + + # Finds the next eligible Job, acquire an advisory lock related to it, and + # executes the job. + # @return [Array<(GoodJob::Job, Object, Exception)>, nil] + # If a job was executed, returns an array with the {Job} record, the + # return value for the job's +#perform+ method, and the exception the job + # raised, if any (if the job raised, then the second array entry will be + # +nil+). If there were no jobs to execute, returns +nil+. def self.perform_with_advisory_lock good_job = nil result = nil error = nil @@ -65,10 +145,19 @@ end [good_job, result, error] if good_job end + # Places an ActiveJob job on a queue by creating a new {Job} record. + # @param active_job [ActiveJob::Base] + # The job to enqueue. + # @param scheduled_at [Float] + # Epoch timestamp when the job should be executed. + # @param create_with_advisory_lock [Boolean] + # Whether to establish a lock on the {Job} record after it is created. + # @return [Job] + # The new {Job} instance representing the queued ActiveJob job. def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) good_job = nil ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| good_job = GoodJob::Job.new( queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, @@ -85,36 +174,29 @@ end good_job end - def perform(destroy_after: !GoodJob.preserve_job_records, reperform_on_standard_error: GoodJob.reperform_jobs_on_standard_error) + # Execute the ActiveJob job this {Job} represents. + # @return [Array<(Object, Exception)>] + # An array of the return value of the job's +#perform+ method and the + # exception raised by the job, if any. If the job completed successfully, + # the second array entry (the exception) will be +nil+ and vice versa. + def perform raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at GoodJob::CurrentExecution.reset - result = nil - rescued_error = nil - error = nil self.performed_at = Time.current - save! unless destroy_after + save! if GoodJob.preserve_job_records - params = serialized_params.merge( - "provider_job_id" => id - ) + result, rescued_error = execute - begin - ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do - result = ActiveJob::Base.execute(params) - end - rescue StandardError => e - rescued_error = e - end - retry_or_discard_error = GoodJob::CurrentExecution.error_on_retry || GoodJob::CurrentExecution.error_on_discard + error = nil if rescued_error error = rescued_error elsif result.is_a?(Exception) error = result result = nil @@ -123,21 +205,35 @@ end error_message = "#{error.class}: #{error.message}" if error self.error = error_message - if rescued_error && reperform_on_standard_error + if rescued_error && GoodJob.reperform_jobs_on_standard_error save! else self.finished_at = Time.current - if destroy_after - destroy! - else + if GoodJob.preserve_job_records save! + else + destroy! end end [result, error] + end + + private + + def execute + params = serialized_params.merge( + "provider_job_id" => id + ) + + ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do + [ActiveJob::Base.execute(params), nil] + end + rescue StandardError => e + [nil, e] end end end