lib/good_job/scheduler.rb in good_job-3.21.0 vs lib/good_job/scheduler.rb in good_job-3.21.1

- old
+ new

@@ -2,11 +2,10 @@ require "concurrent/executor/thread_pool_executor" require "concurrent/executor/timer_set" require "concurrent/scheduled_task" require "concurrent/utility/processor_counter" -require 'good_job/metrics' module GoodJob # :nodoc: # # Schedulers are generic thread pools that are responsible for # periodically checking for available tasks, executing tasks within a thread, @@ -34,37 +33,10 @@ # @!scope class # List of all instantiated Schedulers in the current process. # @return [Array<GoodJob::Scheduler>, nil] cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false - # Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance. - # @param configuration [GoodJob::Configuration] - # @param warm_cache_on_initialize [Boolean] - # @return [GoodJob::Scheduler, GoodJob::MultiScheduler] - def self.from_configuration(configuration, warm_cache_on_initialize: false) - schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads| - queue_string, max_threads = queue_string_and_max_threads.split(':') - max_threads = (max_threads || configuration.max_threads).to_i - - job_performer = GoodJob::JobPerformer.new(queue_string) - GoodJob::Scheduler.new( - job_performer, - max_threads: max_threads, - max_cache: configuration.max_cache, - warm_cache_on_initialize: warm_cache_on_initialize, - cleanup_interval_seconds: configuration.cleanup_interval_seconds, - cleanup_interval_jobs: configuration.cleanup_interval_jobs - ) - end - - if schedulers.size > 1 - GoodJob::MultiScheduler.new(schedulers) - else - schedulers.first - end - end - # Human readable name of the scheduler that includes configuration values. # @return [String] attr_reader :name # @param performer [GoodJob::JobPerformer] @@ -86,11 +58,10 @@ end @name = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})" @executor_options[:name] = name @cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs) - @metrics = ::GoodJob::Metrics.new @executor_options[:name] = name create_executor warm_cache if warm_cache_on_initialize self.class.instances << self @@ -141,29 +112,30 @@ def restart(timeout: -1) raise ArgumentError, "Scheduler#restart cannot be called with a timeout of nil" if timeout.nil? instrument("scheduler_restart_pools") do shutdown(timeout: timeout) - @metrics.reset + @performer.reset_stats create_executor warm_cache end end # Wakes a thread to allow the performer to execute a task. # @param state [Hash, nil] Contextual information for the performer. See {JobPerformer#next?}. # @return [Boolean, nil] Whether work was started. - # # * +nil+ if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity. # * +true+ if the performer started executing work. # * +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it. def create_thread(state = nil) return nil unless executor.running? - if state + if state.present? return false unless performer.next?(state) + fanout = state&.fetch(:fanout, nil) + if state[:count] # When given state for multiple jobs, try to create a thread for each one. # Return true if a thread can be created for all of them, nil if partial or none. state_without_count = state.without(:count) @@ -191,11 +163,11 @@ return nil unless executor.ready_worker_count.positive? elsif @max_cache.positive? return nil unless remaining_cache_count.positive? end - create_task(delay) + create_task(delay, fanout: fanout) run_now ? true : nil end # Invoked on completion of ThreadPoolExecutor task @@ -205,20 +177,10 @@ result = output.is_a?(GoodJob::ExecutionResult) ? output : nil unhandled_error = thread_error || result&.unhandled_error GoodJob._on_thread_error(unhandled_error) if unhandled_error - if unhandled_error || result&.handled_error - @metrics.increment_errored_executions - elsif result&.unexecutable - @metrics.increment_unexecutable_executions - elsif result - @metrics.increment_succeeded_executions - else - @metrics.increment_empty_executions - end - instrument("finished_job_task", { result: output, error: thread_error, time: time }) return unless output @cleanup_tracker.increment if @cleanup_tracker.cleanup? @@ -230,20 +192,21 @@ # Information about the Scheduler # @return [Hash] def stats available_threads = executor.ready_worker_count + { name: name, queues: performer.name, max_threads: @executor_options[:max_threads], active_threads: @executor_options[:max_threads] - available_threads, available_threads: available_threads, max_cache: @max_cache, active_cache: cache_count, available_cache: remaining_cache_count, - }.merge!(@metrics.to_h) + }.merge!(@performer.stats.without(:name)) end # Preload existing runnable and future-scheduled jobs # @return [void] def warm_cache @@ -298,15 +261,18 @@ @executor = ThreadPoolExecutor.new(@executor_options) end end # @param delay [Integer] + # @param fanout [Boolean] Whether to eagerly create a 2nd execution thread if a job is found. # @return [void] - def create_task(delay = 0) - future = Concurrent::ScheduledTask.new(delay, args: [performer], executor: executor, timer_set: timer_set) do |thr_performer| + def create_task(delay = 0, fanout: false) + future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer| Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name Rails.application.reloader.wrap do - thr_performer.next + thr_performer.next do |found| + thr_scheduler.create_thread({ fanout: fanout }) if found && fanout + end end end future.add_observer(self, :task_observer) future.execute end