lib/good_job/job.rb in good_job-2.1.0 vs lib/good_job/job.rb in good_job-2.2.0

- old
+ new

@@ -1,299 +1,11 @@ # frozen_string_literal: true module GoodJob - # ActiveRecord model that represents an +ActiveJob+ job. - # Parent class can be configured with +GoodJob.active_record_parent_class+. - # @!parse - # class Job < ActiveRecord::Base; end - class Job < Object.const_get(GoodJob.active_record_parent_class) - include Lockable - - # Raised if something attempts to execute a previously completed Job again. - PreviouslyPerformedError = Class.new(StandardError) - - # ActiveJob jobs without a +queue_name+ attribute are placed on this queue. - DEFAULT_QUEUE_NAME = 'default' - # ActiveJob jobs without a +priority+ attribute are given this priority. - DEFAULT_PRIORITY = 0 - - self.table_name = 'good_jobs' - self.advisory_lockable_column = 'active_job_id' - - # Parse a string representing a group of queues into a more readable data - # structure. - # @param string [String] Queue string - # @return [Hash] - # How to match a given queue. It can have the following keys and values: - # - +{ all: true }+ indicates that all queues match. - # - +{ exclude: Array<String> }+ indicates the listed queue names should - # not match. - # - +{ include: Array<String> }+ indicates the listed queue names should - # match. - # @example - # GoodJob::Job.queue_parser('-queue1,queue2') - # => { exclude: [ 'queue1', 'queue2' ] } - def self.queue_parser(string) - string = string.presence || '*' - - if string.first == '-' - exclude_queues = true - string = string[1..-1] - end - - queues = string.split(',').map(&:strip) - - if queues.include?('*') - { all: true } - elsif exclude_queues - { exclude: queues } - else - { include: queues } - end - end - - # Get Jobs with given class name - # @!method with_job_class - # @!scope class - # @param string [String] - # Job class name - # @return [ActiveRecord::Relation] - scope :with_job_class, ->(job_class) { where("serialized_params->>'job_class' = ?", job_class) } - - # Get Jobs that have not yet been completed. - # @!method unfinished - # @!scope class - # @return [ActiveRecord::Relation] - scope :unfinished, -> { where(finished_at: nil) } - - # Get Jobs that are not scheduled for a later time than now (i.e. jobs that - # are not scheduled or scheduled for earlier than the current time). - # @!method only_scheduled - # @!scope class - # @return [ActiveRecord::Relation] - scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(Time.current)).or(where(scheduled_at: nil)) } - - # Order jobs by priority (highest priority first). - # @!method priority_ordered - # @!scope class - # @return [ActiveRecord::Relation] - scope :priority_ordered, -> { order('priority DESC NULLS LAST') } - - # Order jobs by scheduled (unscheduled or soonest first). - # @!method schedule_ordered - # @!scope class - # @return [ActiveRecord::Relation] - scope :schedule_ordered, -> { order(Arel.sql('COALESCE(scheduled_at, created_at) ASC')) } - - # Get Jobs were completed before the given timestamp. If no timestamp is - # provided, get all jobs that have been completed. By default, GoodJob - # deletes jobs after they are completed and this will find no jobs. - # However, if you have changed {GoodJob.preserve_job_records}, this may - # find completed Jobs. - # @!method finished(timestamp = nil) - # @!scope class - # @param timestamp (Float) - # Get jobs that finished before this time (in epoch time). - # @return [ActiveRecord::Relation] - scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(timestamp)) : where.not(finished_at: nil) } - - # Get Jobs that started but not finished yet. - # @!method running - # @!scope class - # @return [ActiveRecord::Relation] - scope :running, -> { where.not(performed_at: nil).where(finished_at: nil) } - - # Get Jobs that do not have subsequent retries - # @!method running - # @!scope class - # @return [ActiveRecord::Relation] - scope :head, -> { where(retried_good_job_id: nil) } - - # Get Jobs have errored that will not be retried further - # @!method running - # @!scope class - # @return [ActiveRecord::Relation] - scope :dead, -> { head.where.not(error: nil) } - - # Get Jobs on queues that match the given queue string. - # @!method queue_string(string) - # @!scope class - # @param string [String] - # A string expression describing what queues to select. See - # {Job.queue_parser} or - # {file:README.md#optimize-queues-threads-and-processes} for more details - # on the format of the string. Note this only handles individual - # semicolon-separated segments of that string format. - # @return [ActiveRecord::Relation] - scope :queue_string, (lambda do |string| - parsed = queue_parser(string) - - if parsed[:all] - all - elsif parsed[:exclude] - where.not(queue_name: parsed[:exclude]).or where(queue_name: nil) - elsif parsed[:include] - where(queue_name: parsed[:include]) - end - end) - - # Get Jobs in display order with optional keyset pagination. - # @!method display_all(after_scheduled_at: nil, after_id: nil) - # @!scope class - # @param after_scheduled_at [DateTime, String, nil] - # Display records scheduled after this time for keyset pagination - # @param after_id [Numeric, String, nil] - # Display records after this ID for keyset pagination - # @return [ActiveRecord::Relation] - scope :display_all, (lambda do |after_scheduled_at: nil, after_id: nil| - query = order(Arel.sql('COALESCE(scheduled_at, created_at) DESC, id DESC')) - if after_scheduled_at.present? && after_id.present? - query = query.where(Arel.sql('(COALESCE(scheduled_at, created_at), id) < (:after_scheduled_at, :after_id)'), after_scheduled_at: after_scheduled_at, after_id: after_id) - elsif after_scheduled_at.present? - query = query.where(Arel.sql('(COALESCE(scheduled_at, created_at)) < (:after_scheduled_at)'), after_scheduled_at: after_scheduled_at) - end - query - end) - - # Finds the next eligible Job, acquire an advisory lock related to it, and - # executes the job. - # @return [ExecutionResult, nil] - # If a job was executed, returns an array with the {Job} record, the - # return value for the job's +#perform+ method, and the exception the job - # raised, if any (if the job raised, then the second array entry will be - # +nil+). If there were no jobs to execute, returns +nil+. - def self.perform_with_advisory_lock - unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock(unlock_session: true) do |good_jobs| - good_job = good_jobs.first - break if good_job.blank? - break :unlocked unless good_job&.executable? - - begin - good_job.with_advisory_lock(key: "good_jobs-#{good_job.active_job_id}") do - good_job.perform - end - rescue RecordAlreadyAdvisoryLockedError => e - ExecutionResult.new(value: nil, handled_error: e) - end - end - end - - # Fetches the scheduled execution time of the next eligible Job(s). - # @param after [DateTime] - # @param limit [Integer] - # @param now_limit [Integer, nil] - # @return [Array<DateTime>] - def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) - query = advisory_unlocked.unfinished.schedule_ordered - - after ||= Time.current - after_query = query.where('scheduled_at > ?', after).or query.where(scheduled_at: nil).where('created_at > ?', after) - after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first } - - if now_limit&.positive? - now_query = query.where('scheduled_at < ?', Time.current).or query.where(scheduled_at: nil) - now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first } - end - - Array(now_at) + after_at - end - - # Places an ActiveJob job on a queue by creating a new {Job} record. - # @param active_job [ActiveJob::Base] - # The job to enqueue. - # @param scheduled_at [Float] - # Epoch timestamp when the job should be executed. - # @param create_with_advisory_lock [Boolean] - # Whether to establish a lock on the {Job} record after it is created. - # @return [Job] - # The new {Job} instance representing the queued ActiveJob job. - def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) - ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| - good_job_args = { - active_job_id: active_job.job_id, - queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, - priority: active_job.priority || DEFAULT_PRIORITY, - serialized_params: active_job.serialize, - scheduled_at: scheduled_at, - create_with_advisory_lock: create_with_advisory_lock, - } - - good_job_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) - - if CurrentExecution.cron_key - good_job_args[:cron_key] = CurrentExecution.cron_key - elsif CurrentExecution.active_job_id == active_job.job_id - good_job_args[:cron_key] = CurrentExecution.good_job.cron_key - end - - good_job = GoodJob::Job.new(**good_job_args) - - instrument_payload[:good_job] = good_job - - good_job.save! - active_job.provider_job_id = good_job.id - - CurrentExecution.good_job.retried_good_job_id = good_job.id if CurrentExecution.good_job && CurrentExecution.good_job.active_job_id == active_job.job_id - - good_job - end - end - - # Execute the ActiveJob job this {Job} represents. - # @return [ExecutionResult] - # An array of the return value of the job's +#perform+ method and the - # exception raised by the job, if any. If the job completed successfully, - # the second array entry (the exception) will be +nil+ and vice versa. - def perform - raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at - - self.performed_at = Time.current - save! if GoodJob.preserve_job_records - - result = execute - - job_error = result.handled_error || result.unhandled_error - self.error = "#{job_error.class}: #{job_error.message}" if job_error - - if result.unhandled_error && GoodJob.retry_on_unhandled_error - save! - elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) - self.finished_at = Time.current - save! - else - destroy! - end - - result - end - - # Tests whether this job is safe to be executed by this thread. - # @return [Boolean] - def executable? - self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id) - end - - private - - # @return [ExecutionResult] - def execute - GoodJob::CurrentExecution.reset - GoodJob::CurrentExecution.good_job = self - - job_data = serialized_params.deep_dup - job_data["provider_job_id"] = id - - ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do - value = ActiveJob::Base.execute(job_data) - - if value.is_a?(Exception) - handled_error = value - value = nil - end - handled_error ||= GoodJob::CurrentExecution.error_on_retry || GoodJob::CurrentExecution.error_on_discard - - ExecutionResult.new(value: value, handled_error: handled_error) - rescue StandardError => e - ExecutionResult.new(value: nil, unhandled_error: e) - end + # @deprecated Use {GoodJob::Execution} instead. + class Job < Execution + after_initialize do |_job| + ActiveSupport::Deprecation.warn( + "The `GoodJob::Job` class name is deprecated. Replace with `GoodJob::Execution`." + ) end end end