lib/good_job/scheduler.rb in good_job-1.6.0 vs lib/good_job/scheduler.rb in good_job-1.7.0
- old
+ new
@@ -1,7 +1,8 @@
require "concurrent/executor/thread_pool_executor"
-require "concurrent/timer_task"
+require "concurrent/executor/timer_set"
+require "concurrent/scheduled_task"
require "concurrent/utility/processor_counter"
module GoodJob # :nodoc:
#
# Schedulers are generic thread pools that are responsible for
@@ -20,11 +21,11 @@
name: name,
min_threads: 0,
max_threads: Configuration::DEFAULT_MAX_THREADS,
auto_terminate: true,
idletime: 60,
- max_queue: 0,
+ max_queue: 1, # ideally zero, but 0 == infinite
fallback_policy: :discard,
}.freeze
# @!attribute [r] instances
# @!scope class
@@ -32,18 +33,24 @@
# @return [array<GoodJob:Scheduler>]
cattr_reader :instances, default: [], instance_reader: false
# Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
# @param configuration [GoodJob::Configuration]
+ # @param warm_cache_on_initialize [Boolean]
# @return [GoodJob::Scheduler, GoodJob::MultiScheduler]
- def self.from_configuration(configuration)
+ def self.from_configuration(configuration, warm_cache_on_initialize: true)
schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
queue_string, max_threads = queue_string_and_max_threads.split(':')
max_threads = (max_threads || configuration.max_threads).to_i
job_performer = GoodJob::JobPerformer.new(queue_string)
- GoodJob::Scheduler.new(job_performer, max_threads: max_threads)
+ GoodJob::Scheduler.new(
+ job_performer,
+ max_threads: max_threads,
+ max_cache: configuration.max_cache,
+ warm_cache_on_initialize: warm_cache_on_initialize
+ )
end
if schedulers.size > 1
GoodJob::MultiScheduler.new(schedulers)
else
@@ -51,22 +58,26 @@
end
end
# @param performer [GoodJob::JobPerformer]
# @param max_threads [Numeric, nil] number of seconds between polls for jobs
- def initialize(performer, max_threads: nil)
+ # @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory
+ # @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately
+ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: true)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
self.class.instances << self
@performer = performer
+ @max_cache = max_cache || 0
@pool_options = DEFAULT_POOL_OPTIONS.dup
@pool_options[:max_threads] = max_threads if max_threads.present?
@pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]})"
create_pool
+ warm_cache if warm_cache_on_initialize
end
# Shut down the scheduler.
# This stops all threads in the pool.
# If +wait+ is +true+, the scheduler will wait for any active tasks to finish.
@@ -77,10 +88,12 @@
def shutdown(wait: true)
return unless @pool&.running?
instrument("scheduler_shutdown_start", { wait: wait })
instrument("scheduler_shutdown", { wait: wait }) do
+ @timer_set.shutdown
+
@pool.shutdown
@pool.wait_for_termination if wait
# TODO: Should be killed if wait is not true
end
end
@@ -97,32 +110,53 @@
# @return [void]
def restart(wait: true)
instrument("scheduler_restart_pools") do
shutdown(wait: wait) unless shutdown?
create_pool
+ warm_cache
end
end
# Wakes a thread to allow the performer to execute a task.
# @param state [nil, Object] Contextual information for the performer. See {Performer#next?}.
# @return [nil, Boolean] Whether work was started.
# Returns +nil+ if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity.
# Returns +true+ if the performer started executing work.
# Returns +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it.
def create_thread(state = nil)
- return nil unless @pool.running? && @pool.ready_worker_count.positive?
- return false if state && !@performer.next?(state)
+ return nil unless @pool.running?
- future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
+ if state
+ return false unless @performer.next?(state)
+
+ if state[:scheduled_at]
+ scheduled_at = if state[:scheduled_at].is_a? String
+ Time.zone.parse state[:scheduled_at]
+ else
+ state[:scheduled_at]
+ end
+ delay = [(scheduled_at - Time.current).to_f, 0].max
+ end
+ end
+
+ delay ||= 0
+ run_now = delay <= 0.01
+ if run_now
+ return nil unless @pool.ready_worker_count.positive?
+ elsif @max_cache.positive?
+ return nil unless remaining_cache_count.positive?
+ end
+
+ future = Concurrent::ScheduledTask.new(delay, args: [@performer], executor: @pool, timer_set: timer_set) do |performer|
output = nil
Rails.application.executor.wrap { output = performer.next }
output
end
future.add_observer(self, :task_observer)
future.execute
- true
+ run_now ? true : nil
end
# Invoked on completion of ThreadPoolExecutor task
# @!visibility private
# @return [void]
@@ -130,14 +164,40 @@
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
instrument("finished_job_task", { result: output, error: thread_error, time: time })
create_thread if output
end
+ def warm_cache
+ return if @max_cache.zero?
+
+ @performer.next_at(
+ limit: @max_cache,
+ now_limit: @pool_options[:max_threads]
+ ).each do |scheduled_at|
+ create_thread({ scheduled_at: scheduled_at })
+ end
+ end
+
+ def stats
+ {
+ name: @performer.name,
+ max_threads: @pool_options[:max_threads],
+ active_threads: @pool.ready_worker_count - @pool_options[:max_threads],
+ inactive_threads: @pool.ready_worker_count,
+ max_cache: @max_cache,
+ cache_count: cache_count,
+ cache_remaining: remaining_cache_count,
+ }
+ end
+
private
+ attr_reader :timer_set
+
def create_pool
instrument("scheduler_create_pool", { performer_name: @performer.name, max_threads: @pool_options[:max_threads] }) do
+ @timer_set = Concurrent::TimerSet.new
@pool = ThreadPoolExecutor.new(@pool_options)
end
end
def instrument(name, payload = {}, &block)
@@ -146,9 +206,17 @@
process_id: GoodJob::CurrentExecution.process_id,
thread_name: GoodJob::CurrentExecution.thread_name,
})
ActiveSupport::Notifications.instrument("#{name}.good_job", payload, &block)
+ end
+
+ def cache_count
+ timer_set.instance_variable_get(:@queue).length
+ end
+
+ def remaining_cache_count
+ @max_cache - cache_count
end
# Custom sub-class of +Concurrent::ThreadPoolExecutor+ to add additional worker status.
# @private
class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor