lib/good_job/scheduler.rb in good_job-1.6.0 vs lib/good_job/scheduler.rb in good_job-1.7.0

- old
+ new

@@ -1,7 +1,8 @@ require "concurrent/executor/thread_pool_executor" -require "concurrent/timer_task" +require "concurrent/executor/timer_set" +require "concurrent/scheduled_task" require "concurrent/utility/processor_counter" module GoodJob # :nodoc: # # Schedulers are generic thread pools that are responsible for @@ -20,11 +21,11 @@ name: name, min_threads: 0, max_threads: Configuration::DEFAULT_MAX_THREADS, auto_terminate: true, idletime: 60, - max_queue: 0, + max_queue: 1, # ideally zero, but 0 == infinite fallback_policy: :discard, }.freeze # @!attribute [r] instances # @!scope class @@ -32,18 +33,24 @@ # @return [array<GoodJob:Scheduler>] cattr_reader :instances, default: [], instance_reader: false # Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance. # @param configuration [GoodJob::Configuration] + # @param warm_cache_on_initialize [Boolean] # @return [GoodJob::Scheduler, GoodJob::MultiScheduler] - def self.from_configuration(configuration) + def self.from_configuration(configuration, warm_cache_on_initialize: true) schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads| queue_string, max_threads = queue_string_and_max_threads.split(':') max_threads = (max_threads || configuration.max_threads).to_i job_performer = GoodJob::JobPerformer.new(queue_string) - GoodJob::Scheduler.new(job_performer, max_threads: max_threads) + GoodJob::Scheduler.new( + job_performer, + max_threads: max_threads, + max_cache: configuration.max_cache, + warm_cache_on_initialize: warm_cache_on_initialize + ) end if schedulers.size > 1 GoodJob::MultiScheduler.new(schedulers) else @@ -51,22 +58,26 @@ end end # @param performer [GoodJob::JobPerformer] # @param max_threads [Numeric, nil] number of seconds between polls for jobs - def initialize(performer, max_threads: nil) + # @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory + # @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately + def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: true) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) self.class.instances << self @performer = performer + @max_cache = max_cache || 0 @pool_options = DEFAULT_POOL_OPTIONS.dup @pool_options[:max_threads] = max_threads if max_threads.present? @pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]})" create_pool + warm_cache if warm_cache_on_initialize end # Shut down the scheduler. # This stops all threads in the pool. # If +wait+ is +true+, the scheduler will wait for any active tasks to finish. @@ -77,10 +88,12 @@ def shutdown(wait: true) return unless @pool&.running? instrument("scheduler_shutdown_start", { wait: wait }) instrument("scheduler_shutdown", { wait: wait }) do + @timer_set.shutdown + @pool.shutdown @pool.wait_for_termination if wait # TODO: Should be killed if wait is not true end end @@ -97,32 +110,53 @@ # @return [void] def restart(wait: true) instrument("scheduler_restart_pools") do shutdown(wait: wait) unless shutdown? create_pool + warm_cache end end # Wakes a thread to allow the performer to execute a task. # @param state [nil, Object] Contextual information for the performer. See {Performer#next?}. # @return [nil, Boolean] Whether work was started. # Returns +nil+ if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity. # Returns +true+ if the performer started executing work. # Returns +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it. def create_thread(state = nil) - return nil unless @pool.running? && @pool.ready_worker_count.positive? - return false if state && !@performer.next?(state) + return nil unless @pool.running? - future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| + if state + return false unless @performer.next?(state) + + if state[:scheduled_at] + scheduled_at = if state[:scheduled_at].is_a? String + Time.zone.parse state[:scheduled_at] + else + state[:scheduled_at] + end + delay = [(scheduled_at - Time.current).to_f, 0].max + end + end + + delay ||= 0 + run_now = delay <= 0.01 + if run_now + return nil unless @pool.ready_worker_count.positive? + elsif @max_cache.positive? + return nil unless remaining_cache_count.positive? + end + + future = Concurrent::ScheduledTask.new(delay, args: [@performer], executor: @pool, timer_set: timer_set) do |performer| output = nil Rails.application.executor.wrap { output = performer.next } output end future.add_observer(self, :task_observer) future.execute - true + run_now ? true : nil end # Invoked on completion of ThreadPoolExecutor task # @!visibility private # @return [void] @@ -130,14 +164,40 @@ GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call) instrument("finished_job_task", { result: output, error: thread_error, time: time }) create_thread if output end + def warm_cache + return if @max_cache.zero? + + @performer.next_at( + limit: @max_cache, + now_limit: @pool_options[:max_threads] + ).each do |scheduled_at| + create_thread({ scheduled_at: scheduled_at }) + end + end + + def stats + { + name: @performer.name, + max_threads: @pool_options[:max_threads], + active_threads: @pool.ready_worker_count - @pool_options[:max_threads], + inactive_threads: @pool.ready_worker_count, + max_cache: @max_cache, + cache_count: cache_count, + cache_remaining: remaining_cache_count, + } + end + private + attr_reader :timer_set + def create_pool instrument("scheduler_create_pool", { performer_name: @performer.name, max_threads: @pool_options[:max_threads] }) do + @timer_set = Concurrent::TimerSet.new @pool = ThreadPoolExecutor.new(@pool_options) end end def instrument(name, payload = {}, &block) @@ -146,9 +206,17 @@ process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name, }) ActiveSupport::Notifications.instrument("#{name}.good_job", payload, &block) + end + + def cache_count + timer_set.instance_variable_get(:@queue).length + end + + def remaining_cache_count + @max_cache - cache_count end # Custom sub-class of +Concurrent::ThreadPoolExecutor+ to add additional worker status. # @private class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor