lib/good_job/scheduler.rb in good_job-1.2.4 vs lib/good_job/scheduler.rb in good_job-1.2.5

- old
+ new

@@ -2,26 +2,30 @@ require "concurrent/timer_task" require "concurrent/utility/processor_counter" module GoodJob # :nodoc: # - # Schedulers are generic thread execution pools that are responsible for - # periodically checking for available execution tasks, executing tasks in a - # bounded thread-pool, and efficiently scaling execution threads. + # Schedulers are generic thread pools that are responsible for + # periodically checking for available tasks, executing tasks within a thread, + # and efficiently scaling active threads. # - # Schedulers are "generic" in the sense that they delegate task execution - # details to a "Performer" object that responds to #next. + # Every scheduler has a single {Performer} that will execute tasks. + # The scheduler is responsible for calling its performer efficiently across threads managed by an instance of +Concurrent::ThreadPoolExecutor+. + # If a performer does not have work, the thread will go to sleep. + # The scheduler maintains an instance of +Concurrent::TimerTask+, which wakes sleeping threads and causes them to check whether the performer has new work. # class Scheduler - # Defaults for instance of Concurrent::TimerTask + # Defaults for instance of Concurrent::TimerTask. + # The timer controls how and when sleeping threads check for new work. DEFAULT_TIMER_OPTIONS = { execution_interval: 1, timeout_interval: 1, run_now: true, }.freeze # Defaults for instance of Concurrent::ThreadPoolExecutor + # The thread pool is where work is performed. DEFAULT_POOL_OPTIONS = { name: name, min_threads: 0, max_threads: Concurrent.processor_count, auto_terminate: true, @@ -30,15 +34,16 @@ fallback_policy: :discard, }.freeze # @!attribute [r] instances # @!scope class - # All instantiated Schedulers in the current process. + # List of all instantiated Schedulers in the current process. # @return [array<GoodJob:Scheduler>] cattr_reader :instances, default: [], instance_reader: false # Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance. + # TODO: move this to GoodJob::Configuration # @param configuration [GoodJob::Configuration] # @return [GoodJob::Scheduler, GoodJob::MultiScheduler] def self.from_configuration(configuration) schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads| queue_string, max_threads = queue_string_and_max_threads.split(':') @@ -76,10 +81,11 @@ # @param performer [GoodJob::Performer] # @param timer_options [Hash] Options to instantiate a Concurrent::TimerTask # @param pool_options [Hash] Options to instantiate a Concurrent::ThreadPoolExecutor def initialize(performer, timer_options: {}, pool_options: {}) + # TODO: Replace `timer_options` and `pool_options` with only `poll_interval` and `max_threads` raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) self.class.instances << self @performer = performer @@ -89,57 +95,64 @@ @pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]} poll_interval=#{@timer_options[:execution_interval]})" create_pools end - # Shut down the Scheduler. + # Shut down the scheduler. + # This stops all threads in the pool. + # If +wait+ is +true+, the scheduler will wait for any active tasks to finish. + # If +wait+ is +false+, this method will return immediately even though threads may still be running. + # Use {#shutdown?} to determine whether threads have stopped. # @param wait [Boolean] Wait for actively executing jobs to finish # @return [void] def shutdown(wait: true) @_shutdown = true instrument("scheduler_shutdown_start", { wait: wait }) instrument("scheduler_shutdown", { wait: wait }) do if @timer&.running? @timer.shutdown @timer.wait_for_termination if wait + # TODO: Should be killed if wait is not true end if @pool&.running? @pool.shutdown @pool.wait_for_termination if wait + # TODO: Should be killed if wait is not true end end end - # True when the Scheduler is shutdown. + # Tests whether the scheduler is shutdown. # @return [true, false, nil] def shutdown? @_shutdown end - # Restart the Scheduler. When shutdown, start; or shutdown and start. + # Restart the Scheduler. + # When shutdown, start; or shutdown and start. # @param wait [Boolean] Wait for actively executing jobs to finish # @return [void] def restart(wait: true) instrument("scheduler_restart_pools") do shutdown(wait: wait) unless shutdown? create_pools @_shutdown = false end end - # Triggers a Performer execution, if an execution thread is available. - # @param state [nil, Object] Allows Performer#next? to accept or reject the execution - # @return [nil, Boolean] if the thread was created + # Wakes a thread to allow the performer to execute a task. + # @param state [nil, Object] Contextual information for the performer. See {Performer#next?}. + # @return [nil, Boolean] Whether work was started. + # Returns +nil+ if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity. + # Returns +true+ if the performer started executing work. + # Returns +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it. def create_thread(state = nil) return nil unless @pool.running? && @pool.ready_worker_count.positive? + return false if state && !@performer.next?(state) - if state - return false unless @performer.next?(state) - end - future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| output = nil Rails.application.executor.wrap { output = performer.next } output end @@ -166,11 +179,10 @@ create_thread if output end private - # @return [void] def create_pools instrument("scheduler_create_pools", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval] }) do @pool = ThreadPoolExecutor.new(@pool_options) next unless @timer_options[:execution_interval].positive? @@ -189,12 +201,13 @@ ActiveSupport::Notifications.instrument("#{name}.good_job", payload, &block) end end - # Slightly customized sub-class of Concurrent::ThreadPoolExecutor + # Custom sub-class of +Concurrent::ThreadPoolExecutor+ to add additional worker status. + # @private class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor - # Number of idle or potential threads available to execute tasks + # Number of inactive threads available to execute tasks. # https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437 # @return [Integer] def ready_worker_count synchronize do workers_still_to_be_created = @max_length - @pool.length