lib/good_job/scheduler.rb in good_job-1.1.2 vs lib/good_job/scheduler.rb in good_job-1.1.3
- old
+ new
@@ -1,29 +1,44 @@
require "concurrent/executor/thread_pool_executor"
require "concurrent/timer_task"
require "concurrent/utility/processor_counter"
-module GoodJob
+module GoodJob # :nodoc:
+ # Schedulers are generic thread execution pools that are responsible for
+ # periodically checking for available execution tasks, executing tasks in a
+ # bounded thread-pool, and efficiently scaling execution threads.
+ #
+ # Schedulers are "generic" in the sense that they delegate task execution
+ # details to a "Performer" object that responds to #next.
class Scheduler
+ # Defaults for instance of Concurrent::TimerTask
DEFAULT_TIMER_OPTIONS = {
execution_interval: 1,
timeout_interval: 1,
run_now: true,
}.freeze
+ # Defaults for instance of Concurrent::ThreadPoolExecutor
DEFAULT_POOL_OPTIONS = {
name: 'good_job',
min_threads: 0,
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 60,
max_queue: -1,
fallback_policy: :discard,
}.freeze
+ # @!attribute [r] instances
+ # @!scope class
+ # All instantiated Schedulers in the current process.
+ # @return [array<GoodJob:Scheduler>]
cattr_reader :instances, default: [], instance_reader: false
+ # Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
+ # @param configuration [GoodJob::Configuration]
+ # @return [GoodJob::Scheduler, GoodJob::MultiScheduler]
def self.from_configuration(configuration)
schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
queue_string, max_threads = queue_string_and_max_threads.split(':')
max_threads = (max_threads || configuration.max_threads).to_i
@@ -45,10 +60,13 @@
else
schedulers.first
end
end
+ # @param performer [GoodJob::Performer]
+ # @param timer_options [Hash] Options to instantiate a Concurrent::TimerTask
+ # @param pool_options [Hash] Options to instantiate a Concurrent::ThreadPoolExecutor
def initialize(performer, timer_options: {}, pool_options: {})
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
self.class.instances << self
@@ -57,10 +75,13 @@
@timer_options = DEFAULT_TIMER_OPTIONS.merge(timer_options)
create_pools
end
+ # Shut down the Scheduler.
+ # @param wait [Boolean] Wait for actively executing jobs to finish
+ # @return [void]
def shutdown(wait: true)
@_shutdown = true
ActiveSupport::Notifications.instrument("scheduler_shutdown_start.good_job", { wait: wait, process_id: process_id })
ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait, process_id: process_id }) do
@@ -74,58 +95,61 @@
@pool.wait_for_termination if wait
end
end
end
+ # True when the Scheduler is shutdown.
+ # @return [true, false, nil]
def shutdown?
@_shutdown
end
+ # Restart the Scheduler. When shutdown, start; or shutdown and start.
+ # @param wait [Boolean] Wait for actively executing jobs to finish
+ # @return [void]
def restart(wait: true)
ActiveSupport::Notifications.instrument("scheduler_restart_pools.good_job", { process_id: process_id }) do
shutdown(wait: wait) unless shutdown?
create_pools
end
end
+ # Triggers the execution the Performer, if an execution thread is available.
+ # @return [Boolean]
def create_thread
return false unless @pool.ready_worker_count.positive?
future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
output = nil
Rails.application.executor.wrap { output = performer.next }
output
end
future.add_observer(self, :task_observer)
future.execute
+ true
end
+ # Invoked on completion of TimerTask task.
+ # @!visibility private
+ # @return [void]
def timer_observer(time, executed_task, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: thread_error, time: time })
end
+ # Invoked on completion of ThreadPoolExecutor task
+ # @!visibility private
+ # @return [void]
def task_observer(time, output, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time })
create_thread if output
end
- class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor
- # https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437
- def ready_worker_count
- synchronize do
- workers_still_to_be_created = @max_length - @pool.length
- workers_created_but_waiting = @ready.length
-
- workers_still_to_be_created + workers_created_but_waiting
- end
- end
- end
-
private
+ # @return [void]
def create_pools
ActiveSupport::Notifications.instrument("scheduler_create_pools.good_job", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval], process_id: process_id }) do
@pool = ThreadPoolExecutor.new(@pool_options)
next unless @timer_options[:execution_interval].positive?
@@ -133,14 +157,31 @@
@timer.add_observer(self, :timer_observer)
@timer.execute
end
end
+ # @return [Integer] Current process ID
def process_id
Process.pid
end
+ # @return [String] Current thread name
def thread_name
- Thread.current.name || Thread.current.object_id
+ (Thread.current.name || Thread.current.object_id).to_s
+ end
+ end
+
+ # Slightly customized sub-class of Concurrent::ThreadPoolExecutor
+ class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor
+ # Number of idle or potential threads available to execute tasks
+ # https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437
+ # @return [Integer]
+ def ready_worker_count
+ synchronize do
+ workers_still_to_be_created = @max_length - @pool.length
+ workers_created_but_waiting = @ready.length
+
+ workers_still_to_be_created + workers_created_but_waiting
+ end
end
end
end