lib/good_job/scheduler.rb in good_job-3.15.14 vs lib/good_job/scheduler.rb in good_job-3.16.0

- old
+ new

@@ -1,10 +1,12 @@ # frozen_string_literal: true + require "concurrent/executor/thread_pool_executor" require "concurrent/executor/timer_set" require "concurrent/scheduled_task" require "concurrent/utility/processor_counter" +require 'good_job/metrics' module GoodJob # :nodoc: # # Schedulers are generic thread pools that are responsible for # periodically checking for available tasks, executing tasks within a thread, @@ -30,11 +32,11 @@ # @!attribute [r] instances # @!scope class # List of all instantiated Schedulers in the current process. # @return [Array<GoodJob::Scheduler>, nil] - cattr_reader :instances, default: [], instance_reader: false + cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false # Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance. # @param configuration [GoodJob::Configuration] # @param warm_cache_on_initialize [Boolean] # @return [GoodJob::Scheduler, GoodJob::MultiScheduler] @@ -72,12 +74,10 @@ # @param cleanup_interval_seconds [Numeric, nil] number of seconds between cleaning up job records # @param cleanup_interval_jobs [Numeric, nil] number of executed jobs between cleaning up job records def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) - self.class.instances << self - @performer = performer @max_cache = max_cache || 0 @executor_options = DEFAULT_EXECUTOR_OPTIONS.dup if max_threads.present? @@ -86,12 +86,16 @@ end @name = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})" @executor_options[:name] = name @cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs) + @metrics = ::GoodJob::Metrics.new + @executor_options[:name] = name + create_executor warm_cache if warm_cache_on_initialize + self.class.instances << self end # Tests whether the scheduler is running. # @return [Boolean, nil] delegate :running?, to: :executor, allow_nil: true @@ -137,10 +141,11 @@ def restart(timeout: -1) raise ArgumentError, "Scheduler#restart cannot be called with a timeout of nil" if timeout.nil? instrument("scheduler_restart_pools") do shutdown(timeout: timeout) + @metrics.reset create_executor warm_cache end end @@ -195,13 +200,25 @@ # Invoked on completion of ThreadPoolExecutor task # @!visibility private # @return [void] def task_observer(time, output, thread_error) - error = thread_error || (output.is_a?(GoodJob::ExecutionResult) ? output.unhandled_error : nil) - GoodJob._on_thread_error(error) if error + result = output.is_a?(GoodJob::ExecutionResult) ? output : nil + unhandled_error = thread_error || result&.unhandled_error + GoodJob._on_thread_error(unhandled_error) if unhandled_error + + if unhandled_error || result&.handled_error + @metrics.increment_errored_executions + elsif result&.unexecutable + @metrics.increment_unexecutable_executions + elsif result + @metrics.increment_succeeded_executions + else + @metrics.increment_empty_executions + end + instrument("finished_job_task", { result: output, error: thread_error, time: time }) return unless output @cleanup_tracker.increment if @cleanup_tracker.cleanup? @@ -212,19 +229,21 @@ end # Information about the Scheduler # @return [Hash] def stats + available_threads = executor.ready_worker_count { - name: performer.name, + name: name, + queues: performer.name, max_threads: @executor_options[:max_threads], - active_threads: @executor_options[:max_threads] - executor.ready_worker_count, - available_threads: executor.ready_worker_count, + active_threads: @executor_options[:max_threads] - available_threads, + available_threads: available_threads, max_cache: @max_cache, active_cache: cache_count, available_cache: remaining_cache_count, - } + }.merge!(@metrics.to_h) end # Preload existing runnable and future-scheduled jobs # @return [void] def warm_cache @@ -314,11 +333,11 @@ @max_cache - cache_count end # Custom sub-class of +Concurrent::ThreadPoolExecutor+ to add additional worker status. # @private - class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor + class ThreadPoolExecutor < ::Concurrent::ThreadPoolExecutor # Number of inactive threads available to execute tasks. # https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437 # @return [Integer] def ready_worker_count synchronize do @@ -333,10 +352,10 @@ end end # Custom sub-class of +Concurrent::TimerSet+ for additional behavior. # @private - class TimerSet < Concurrent::TimerSet + class TimerSet < ::Concurrent::TimerSet # Number of scheduled jobs in the queue # @return [Integer] def length @queue.length end