app/models/good_job/execution.rb in good_job-3.30.0 vs app/models/good_job/execution.rb in good_job-3.30.1

- old
+ new

@@ -1,614 +1,12 @@ # frozen_string_literal: true module GoodJob # Active Record model that represents an +ActiveJob+ job. + # Most behavior is currently in BaseExecution in anticipation of + # moving behavior into +GoodJob::Job+. class Execution < BaseExecution - # Raised if something attempts to execute a previously completed Execution again. - PreviouslyPerformedError = Class.new(StandardError) - - # String separating Error Class from Error Message - ERROR_MESSAGE_SEPARATOR = ": " - - # ActiveJob jobs without a +queue_name+ attribute are placed on this queue. - DEFAULT_QUEUE_NAME = 'default' - # ActiveJob jobs without a +priority+ attribute are given this priority. - DEFAULT_PRIORITY = 0 - self.table_name = 'good_jobs' - self.advisory_lockable_column = 'active_job_id' - self.implicit_order_column = 'created_at' - define_model_callbacks :perform - define_model_callbacks :perform_unlocked, only: :after - - set_callback :perform, :around, :reset_batch_values - set_callback :perform_unlocked, :after, :continue_discard_or_finish_batch - - # Parse a string representing a group of queues into a more readable data - # structure. - # @param string [String] Queue string - # @return [Hash] - # How to match a given queue. It can have the following keys and values: - # - +{ all: true }+ indicates that all queues match. - # - +{ exclude: Array<String> }+ indicates the listed queue names should - # not match. - # - +{ include: Array<String> }+ indicates the listed queue names should - # match. - # - +{ include: Array<String>, ordered_queues: true }+ indicates the listed - # queue names should match, and dequeue should respect queue order. - # @example - # GoodJob::Execution.queue_parser('-queue1,queue2') - # => { exclude: [ 'queue1', 'queue2' ] } - def self.queue_parser(string) - string = string.strip.presence || '*' - - case string.first - when '-' - exclude_queues = true - string = string[1..] - when '+' - ordered_queues = true - string = string[1..] - end - - queues = string.split(',').map(&:strip) - - if queues.include?('*') - { all: true } - elsif exclude_queues - { exclude: queues } - elsif ordered_queues - { - include: queues, - ordered_queues: true, - } - else - { include: queues } - end - end - - belongs_to :batch, class_name: 'GoodJob::BatchRecord', optional: true, inverse_of: :executions belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions - has_many :discrete_executions, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :execution # rubocop:disable Rails/HasManyOrHasOneDependent - - after_destroy lambda { - GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4 - self.class.active_job_id(active_job_id).delete_all - }, if: -> { @_destroy_job } - - # Get executions with given ActiveJob ID - # @!method active_job_id(active_job_id) - # @!scope class - # @param active_job_id [String] - # ActiveJob ID - # @return [ActiveRecord::Relation] - scope :active_job_id, ->(active_job_id) { where(active_job_id: active_job_id) } - - # Get executions that have not yet finished (succeeded or discarded). - # @!method unfinished - # @!scope class - # @return [ActiveRecord::Relation] - scope :unfinished, -> { where(finished_at: nil) } - - # Get executions that are not scheduled for a later time than now (i.e. jobs that - # are not scheduled or scheduled for earlier than the current time). - # @!method only_scheduled - # @!scope class - # @return [ActiveRecord::Relation] - scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))).or(where(scheduled_at: nil)) } - - # Order executions by priority (highest priority first). - # @!method priority_ordered - # @!scope class - # @return [ActiveRecord::Relation] - scope :priority_ordered, (lambda do - if GoodJob.configuration.smaller_number_is_higher_priority - order('priority ASC NULLS LAST') - else - order('priority DESC NULLS LAST') - end - end) - - # Order executions by created_at, for first-in first-out - # @!method creation_ordered - # @!scope class - # @return [ActiveRecord:Relation] - scope :creation_ordered, -> { order(created_at: :asc) } - - # Order executions for de-queueing - # @!method dequeueing_ordered(parsed_queues) - # @!scope class - # @param parsed_queues [Hash] - # optional output of .queue_parser, parsed queues, will be used for - # ordered queues. - # @return [ActiveRecord::Relation] - scope :dequeueing_ordered, (lambda do |parsed_queues| - relation = self - relation = relation.queue_ordered(parsed_queues[:include]) if parsed_queues && parsed_queues[:ordered_queues] && parsed_queues[:include] - relation = relation.priority_ordered.creation_ordered - - relation - end) - - # Order executions in order of queues in array param - # @!method queue_ordered(queues) - # @!scope class - # @param queues [Array<string] ordered names of queues - # @return [ActiveRecord::Relation] - scope :queue_ordered, (lambda do |queues| - clauses = queues.map.with_index do |queue_name, index| - "WHEN queue_name = '#{queue_name}' THEN #{index}" - end - - order( - Arel.sql("(CASE #{clauses.join(' ')} ELSE #{queues.length} END)") - ) - end) - - # Order jobs by scheduled or created (oldest first). - # @!method schedule_ordered - # @!scope class - # @return [ActiveRecord::Relation] - scope :schedule_ordered, -> { order(coalesce_scheduled_at_created_at.asc) } - - # Get completed jobs before the given timestamp. If no timestamp is - # provided, get *all* completed jobs. By default, GoodJob - # destroys jobs after they're completed, meaning this returns no jobs. - # However, if you have changed {GoodJob.preserve_job_records}, this may - # find completed Jobs. - # @!method finished(timestamp = nil) - # @!scope class - # @param timestamp (Float) - # Get jobs that finished before this time (in epoch time). - # @return [ActiveRecord::Relation] - scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(bind_value('finished_at', timestamp, ActiveRecord::Type::DateTime))) : where.not(finished_at: nil) } - - # Get Jobs that started but not finished yet. - # @!method running - # @!scope class - # @return [ActiveRecord::Relation] - scope :running, -> { where.not(performed_at: nil).where(finished_at: nil) } - - # Get Jobs that do not have subsequent retries - # @!method running - # @!scope class - # @return [ActiveRecord::Relation] - scope :head, -> { where(retried_good_job_id: nil) } - - # Get Jobs have errored that will not be retried further - # @!method running - # @!scope class - # @return [ActiveRecord::Relation] - scope :dead, -> { head.where.not(error: nil) } - - # Get Jobs on queues that match the given queue string. - # @!method queue_string(string) - # @!scope class - # @param string [String] - # A string expression describing what queues to select. See - # {Execution.queue_parser} or - # {file:README.md#optimize-queues-threads-and-processes} for more details - # on the format of the string. Note this only handles individual - # semicolon-separated segments of that string format. - # @return [ActiveRecord::Relation] - scope :queue_string, (lambda do |string| - parsed = queue_parser(string) - - if parsed[:all] - all - elsif parsed[:exclude] - where.not(queue_name: parsed[:exclude]).or where(queue_name: nil) - elsif parsed[:include] - where(queue_name: parsed[:include]) - end - end) - - def self.build_for_enqueue(active_job, overrides = {}) - new(**enqueue_args(active_job, overrides)) - end - - # Construct arguments for GoodJob::Execution from an ActiveJob instance. - def self.enqueue_args(active_job, overrides = {}) - if active_job.priority && GoodJob.configuration.smaller_number_is_higher_priority.nil? - GoodJob.deprecator.warn(<<~DEPRECATION) - The next major version of GoodJob (v4.0) will change job `priority` to give smaller numbers higher priority (default: `0`), in accordance with Active Job's definition of priority. - To opt-in to this behavior now, set `config.good_job.smaller_number_is_higher_priority = true` in your GoodJob initializer or application.rb. - To not opt-in yet, but silence this deprecation warning, set `config.good_job.smaller_number_is_higher_priority = false`. - DEPRECATION - end - - execution_args = { - active_job_id: active_job.job_id, - queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, - priority: active_job.priority || DEFAULT_PRIORITY, - serialized_params: active_job.serialize, - } - execution_args[:scheduled_at] = Time.zone.at(active_job.scheduled_at) if active_job.scheduled_at - execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) - - if active_job.respond_to?(:good_job_labels) && active_job.good_job_labels.any? && labels_migrated? - labels = active_job.good_job_labels.dup - labels.map! { |label| label.to_s.strip.presence } - labels.tap(&:compact!).tap(&:uniq!) - execution_args[:labels] = labels - end - - reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id - current_execution = CurrentThread.execution - - if reenqueued_current_execution - if GoodJob::BatchRecord.migrated? - execution_args[:batch_id] = current_execution.batch_id - execution_args[:batch_callback_id] = current_execution.batch_callback_id - end - execution_args[:cron_key] = current_execution.cron_key - else - if GoodJob::BatchRecord.migrated? - execution_args[:batch_id] = GoodJob::Batch.current_batch_id - execution_args[:batch_callback_id] = GoodJob::Batch.current_batch_callback_id - end - execution_args[:cron_key] = CurrentThread.cron_key - execution_args[:cron_at] = CurrentThread.cron_at - end - - execution_args.merge(overrides) - end - - # Finds the next eligible Execution, acquire an advisory lock related to it, and - # executes the job. - # @yield [Execution, nil] The next eligible Execution, or +nil+ if none found, before it is performed. - # @return [ExecutionResult, nil] - # If a job was executed, returns an array with the {Execution} record, the - # return value for the job's +#perform+ method, and the exception the job - # raised, if any (if the job raised, then the second array entry will be - # +nil+). If there were no jobs to execute, returns +nil+. - def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) - execution = nil - result = nil - - unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |executions| - execution = executions.first - if execution&.executable? - yield(execution) if block_given? - result = execution.perform(lock_id: lock_id) - else - execution = nil - yield(nil) if block_given? - end - end - - execution&.run_callbacks(:perform_unlocked) - result - end - - # Fetches the scheduled execution time of the next eligible Execution(s). - # @param after [DateTime] - # @param limit [Integer] - # @param now_limit [Integer, nil] - # @return [Array<DateTime>] - def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) - query = advisory_unlocked.unfinished.schedule_ordered - - after ||= Time.current - after_bind = bind_value('scheduled_at', after, ActiveRecord::Type::DateTime) - after_query = query.where(arel_table['scheduled_at'].gt(after_bind)).or query.where(scheduled_at: nil).where(arel_table['created_at'].gt(after_bind)) - after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first } - - if now_limit&.positive? - now_query = query.where(arel_table['scheduled_at'].lt(bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime))).or query.where(scheduled_at: nil) - now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first } - end - - Array(now_at) + after_at - end - - # Places an ActiveJob job on a queue by creating a new {Execution} record. - # @param active_job [ActiveJob::Base] - # The job to enqueue. - # @param scheduled_at [Float] - # Epoch timestamp when the job should be executed, if blank will delegate to the ActiveJob instance - # @param create_with_advisory_lock [Boolean] - # Whether to establish a lock on the {Execution} record after it is created. - # @return [Execution] - # The new {Execution} instance representing the queued ActiveJob job. - def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) - ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| - current_execution = CurrentThread.execution - - retried = current_execution && current_execution.active_job_id == active_job.job_id - if retried - if current_execution.discrete? - execution = current_execution - execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at })) - execution.scheduled_at ||= Time.current - # TODO: these values ideally shouldn't be persisted until the current_execution is finished - # which will require handling `retry_job` being called from outside the execution context. - execution.performed_at = nil - execution.finished_at = nil - else - execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) - end - else - execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) - execution.make_discrete if discrete_support? - end - - if create_with_advisory_lock - if execution.persisted? - execution.advisory_lock - else - execution.create_with_advisory_lock = true - end - end - - instrument_payload[:execution] = execution - execution.save! - - if retried - CurrentThread.execution_retried = execution - CurrentThread.execution.retried_good_job_id = execution.id unless current_execution.discrete? - else - CurrentThread.execution_retried = nil - end - - active_job.provider_job_id = execution.id - execution - end - end - - def self.format_error(error) - raise ArgumentError unless error.is_a?(Exception) - - [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join - end - - # Execute the ActiveJob job this {Execution} represents. - # @return [ExecutionResult] - # An array of the return value of the job's +#perform+ method and the - # exception raised by the job, if any. If the job completed successfully, - # the second array entry (the exception) will be +nil+ and vice versa. - def perform(lock_id:) - run_callbacks(:perform) do - raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at - - job_performed_at = Time.current - monotonic_start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - discrete_execution = nil - result = GoodJob::CurrentThread.within do |current_thread| - current_thread.reset - current_thread.execution = self - - existing_performed_at = performed_at - if existing_performed_at - current_thread.execution_interrupted = existing_performed_at - - if discrete? - interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{existing_performed_at}'")) - self.error = interrupt_error_string - self.error_event = ERROR_EVENT_INTERRUPTED if self.class.error_event_migrated? - monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds - - discrete_execution_attrs = { - error: interrupt_error_string, - finished_at: job_performed_at, - } - discrete_execution_attrs[:error_event] = GoodJob::ErrorEvents::ERROR_EVENT_ENUMS[GoodJob::ErrorEvents::ERROR_EVENT_INTERRUPTED] if self.class.error_event_migrated? - discrete_execution_attrs[:duration] = monotonic_duration if GoodJob::DiscreteExecution.monotonic_duration_migrated? - discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all(discrete_execution_attrs) # rubocop:disable Rails/SkipsModelValidations - end - end - - if discrete? - transaction do - discrete_execution_attrs = { - job_class: job_class, - queue_name: queue_name, - serialized_params: serialized_params, - scheduled_at: (scheduled_at || created_at), - created_at: job_performed_at, - } - discrete_execution_attrs[:process_id] = lock_id if GoodJob::DiscreteExecution.columns_hash.key?("process_id") - - execution_attrs = { - performed_at: job_performed_at, - executions_count: ((executions_count || 0) + 1), - } - if GoodJob::Execution.columns_hash.key?("locked_by_id") - execution_attrs[:locked_by_id] = lock_id - execution_attrs[:locked_at] = Time.current - end - - discrete_execution = discrete_executions.create!(discrete_execution_attrs) - update!(execution_attrs) - end - else - execution_attrs = { - performed_at: job_performed_at, - } - if GoodJob::Execution.columns_hash.key?("locked_by_id") - execution_attrs[:locked_by_id] = lock_id - execution_attrs[:locked_at] = Time.current - end - - update!(execution_attrs) - end - - ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| - value = ActiveJob::Base.execute(active_job_data) - - if value.is_a?(Exception) - handled_error = value - value = nil - end - handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard - - error_event = if handled_error == current_thread.error_on_discard - ERROR_EVENT_DISCARDED - elsif handled_error == current_thread.error_on_retry - ERROR_EVENT_RETRIED - elsif handled_error == current_thread.error_on_retry_stopped - ERROR_EVENT_RETRY_STOPPED - elsif handled_error - ERROR_EVENT_HANDLED - end - - instrument_payload.merge!( - value: value, - handled_error: handled_error, - retried: current_thread.execution_retried.present?, - error_event: error_event - ) - ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried: current_thread.execution_retried) - rescue StandardError => e - error_event = if e.is_a?(GoodJob::InterruptError) - ERROR_EVENT_INTERRUPTED - elsif e == current_thread.error_on_retry_stopped - ERROR_EVENT_RETRY_STOPPED - else - ERROR_EVENT_UNHANDLED - end - - instrument_payload[:unhandled_error] = e - ExecutionResult.new(value: nil, unhandled_error: e, error_event: error_event) - end - end - - job_attributes = if self.class.columns_hash.key?("locked_by_id") - { locked_by_id: nil, locked_at: nil } - else - {} - end - - job_error = result.handled_error || result.unhandled_error - if job_error - error_string = self.class.format_error(job_error) - - job_attributes[:error] = error_string - job_attributes[:error_event] = result.error_event if self.class.error_event_migrated? - if discrete_execution - discrete_execution.error = error_string - discrete_execution.error_event = result.error_event - discrete_execution.error_backtrace = job_error.backtrace if discrete_execution.class.backtrace_migrated? - end - else - job_attributes[:error] = nil - job_attributes[:error_event] = nil - end - job_attributes.delete(:error_event) unless self.class.error_event_migrated? - - job_finished_at = Time.current - monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds - job_attributes[:finished_at] = job_finished_at - if discrete_execution - discrete_execution.finished_at = job_finished_at - discrete_execution.duration = monotonic_duration if GoodJob::DiscreteExecution.monotonic_duration_migrated? - end - - retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error - reenqueued = result.retried? || retried_good_job_id.present? || retry_unhandled_error - if reenqueued - if discrete_execution - job_attributes[:performed_at] = nil - job_attributes[:finished_at] = nil - else - job_attributes[:retried_good_job_id] = retried_good_job_id - job_attributes[:finished_at] = nil if retry_unhandled_error - end - end - - preserve_unhandled = (result.unhandled_error && (GoodJob.retry_on_unhandled_error || GoodJob.preserve_job_records == :on_unhandled_error)) - if GoodJob.preserve_job_records == true || reenqueued || preserve_unhandled || cron_key.present? - if discrete_execution - transaction do - discrete_execution.save! - update!(job_attributes) - end - else - update!(job_attributes) - end - else - destroy_job - end - - result - end - end - - # Tests whether this job is safe to be executed by this thread. - # @return [Boolean] - def executable? - reload.finished_at.blank? - rescue ActiveRecord::RecordNotFound - false - end - - def make_discrete - self.is_discrete = true - self.id = active_job_id - self.job_class = serialized_params['job_class'] - self.executions_count ||= 0 - - current_time = Time.current - self.created_at ||= current_time - self.scheduled_at ||= current_time - end - - # Return formatted serialized_params for display in the dashboard - # @return [Hash] - def display_serialized_params - serialized_params.merge({ - _good_job: attributes.except('serialized_params', 'locktype', 'owns_advisory_lock'), - }) - end - - def running? - if has_attribute?(:locktype) - self['locktype'].present? - else - advisory_locked? - end - end - - def number - serialized_params.fetch('executions', 0) + 1 - end - - # Time between when this job was expected to run and when it started running - def queue_latency - now = Time.zone.now - expected_start = scheduled_at || created_at - actual_start = performed_at || finished_at || now - - actual_start - expected_start unless expected_start >= now - end - - # Time between when this job started and finished - def runtime_latency - (finished_at || Time.zone.now) - performed_at if performed_at - end - - # Destroys this execution and all executions within the same job - def destroy_job - @_destroy_job = true - destroy! - ensure - @_destroy_job = false - end - - def job_state - state = { queue_name: queue_name } - state[:scheduled_at] = scheduled_at if scheduled_at - state - end - - private - - def reset_batch_values(&block) - GoodJob::Batch.within_thread(batch_id: nil, batch_callback_id: nil, &block) - end - - def continue_discard_or_finish_batch - batch._continue_discard_or_finish(self) if GoodJob::BatchRecord.migrated? && batch.present? - end end end