lib/good_job/scheduler.rb in good_job-1.2.4 vs lib/good_job/scheduler.rb in good_job-1.2.5
- old
+ new
@@ -2,26 +2,30 @@
require "concurrent/timer_task"
require "concurrent/utility/processor_counter"
module GoodJob # :nodoc:
#
- # Schedulers are generic thread execution pools that are responsible for
- # periodically checking for available execution tasks, executing tasks in a
- # bounded thread-pool, and efficiently scaling execution threads.
+ # Schedulers are generic thread pools that are responsible for
+ # periodically checking for available tasks, executing tasks within a thread,
+ # and efficiently scaling active threads.
#
- # Schedulers are "generic" in the sense that they delegate task execution
- # details to a "Performer" object that responds to #next.
+ # Every scheduler has a single {Performer} that will execute tasks.
+ # The scheduler is responsible for calling its performer efficiently across threads managed by an instance of +Concurrent::ThreadPoolExecutor+.
+ # If a performer does not have work, the thread will go to sleep.
+ # The scheduler maintains an instance of +Concurrent::TimerTask+, which wakes sleeping threads and causes them to check whether the performer has new work.
#
class Scheduler
- # Defaults for instance of Concurrent::TimerTask
+ # Defaults for instance of Concurrent::TimerTask.
+ # The timer controls how and when sleeping threads check for new work.
DEFAULT_TIMER_OPTIONS = {
execution_interval: 1,
timeout_interval: 1,
run_now: true,
}.freeze
# Defaults for instance of Concurrent::ThreadPoolExecutor
+ # The thread pool is where work is performed.
DEFAULT_POOL_OPTIONS = {
name: name,
min_threads: 0,
max_threads: Concurrent.processor_count,
auto_terminate: true,
@@ -30,15 +34,16 @@
fallback_policy: :discard,
}.freeze
# @!attribute [r] instances
# @!scope class
- # All instantiated Schedulers in the current process.
+ # List of all instantiated Schedulers in the current process.
# @return [array<GoodJob:Scheduler>]
cattr_reader :instances, default: [], instance_reader: false
# Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
+ # TODO: move this to GoodJob::Configuration
# @param configuration [GoodJob::Configuration]
# @return [GoodJob::Scheduler, GoodJob::MultiScheduler]
def self.from_configuration(configuration)
schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
queue_string, max_threads = queue_string_and_max_threads.split(':')
@@ -76,10 +81,11 @@
# @param performer [GoodJob::Performer]
# @param timer_options [Hash] Options to instantiate a Concurrent::TimerTask
# @param pool_options [Hash] Options to instantiate a Concurrent::ThreadPoolExecutor
def initialize(performer, timer_options: {}, pool_options: {})
+ # TODO: Replace `timer_options` and `pool_options` with only `poll_interval` and `max_threads`
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
self.class.instances << self
@performer = performer
@@ -89,57 +95,64 @@
@pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]} poll_interval=#{@timer_options[:execution_interval]})"
create_pools
end
- # Shut down the Scheduler.
+ # Shut down the scheduler.
+ # This stops all threads in the pool.
+ # If +wait+ is +true+, the scheduler will wait for any active tasks to finish.
+ # If +wait+ is +false+, this method will return immediately even though threads may still be running.
+ # Use {#shutdown?} to determine whether threads have stopped.
# @param wait [Boolean] Wait for actively executing jobs to finish
# @return [void]
def shutdown(wait: true)
@_shutdown = true
instrument("scheduler_shutdown_start", { wait: wait })
instrument("scheduler_shutdown", { wait: wait }) do
if @timer&.running?
@timer.shutdown
@timer.wait_for_termination if wait
+ # TODO: Should be killed if wait is not true
end
if @pool&.running?
@pool.shutdown
@pool.wait_for_termination if wait
+ # TODO: Should be killed if wait is not true
end
end
end
- # True when the Scheduler is shutdown.
+ # Tests whether the scheduler is shutdown.
# @return [true, false, nil]
def shutdown?
@_shutdown
end
- # Restart the Scheduler. When shutdown, start; or shutdown and start.
+ # Restart the Scheduler.
+ # When shutdown, start; or shutdown and start.
# @param wait [Boolean] Wait for actively executing jobs to finish
# @return [void]
def restart(wait: true)
instrument("scheduler_restart_pools") do
shutdown(wait: wait) unless shutdown?
create_pools
@_shutdown = false
end
end
- # Triggers a Performer execution, if an execution thread is available.
- # @param state [nil, Object] Allows Performer#next? to accept or reject the execution
- # @return [nil, Boolean] if the thread was created
+ # Wakes a thread to allow the performer to execute a task.
+ # @param state [nil, Object] Contextual information for the performer. See {Performer#next?}.
+ # @return [nil, Boolean] Whether work was started.
+ # Returns +nil+ if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity.
+ # Returns +true+ if the performer started executing work.
+ # Returns +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it.
def create_thread(state = nil)
return nil unless @pool.running? && @pool.ready_worker_count.positive?
+ return false if state && !@performer.next?(state)
- if state
- return false unless @performer.next?(state)
- end
-
future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
output = nil
Rails.application.executor.wrap { output = performer.next }
output
end
@@ -166,11 +179,10 @@
create_thread if output
end
private
- # @return [void]
def create_pools
instrument("scheduler_create_pools", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval] }) do
@pool = ThreadPoolExecutor.new(@pool_options)
next unless @timer_options[:execution_interval].positive?
@@ -189,12 +201,13 @@
ActiveSupport::Notifications.instrument("#{name}.good_job", payload, &block)
end
end
- # Slightly customized sub-class of Concurrent::ThreadPoolExecutor
+ # Custom sub-class of +Concurrent::ThreadPoolExecutor+ to add additional worker status.
+ # @private
class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor
- # Number of idle or potential threads available to execute tasks
+ # Number of inactive threads available to execute tasks.
# https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437
# @return [Integer]
def ready_worker_count
synchronize do
workers_still_to_be_created = @max_length - @pool.length