lib/good_job/scheduler.rb in good_job-2.7.4 vs lib/good_job/scheduler.rb in good_job-2.8.0

- old
+ new

@@ -46,26 +46,34 @@ job_performer = GoodJob::JobPerformer.new(queue_string) GoodJob::Scheduler.new( job_performer, max_threads: max_threads, max_cache: configuration.max_cache, - warm_cache_on_initialize: warm_cache_on_initialize + warm_cache_on_initialize: warm_cache_on_initialize, + cleanup_interval_seconds: configuration.cleanup_interval_seconds, + cleanup_interval_jobs: configuration.cleanup_interval_jobs ) end if schedulers.size > 1 GoodJob::MultiScheduler.new(schedulers) else schedulers.first end end + # Human readable name of the scheduler that includes configuration values. + # @return [String] + attr_reader :name + # @param performer [GoodJob::JobPerformer] # @param max_threads [Numeric, nil] number of seconds between polls for jobs # @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory # @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling +warm_cache+ - def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false) + # @param cleanup_interval_seconds [Numeric, nil] number of seconds between cleaning up job records + # @param cleanup_interval_jobs [Numeric, nil] number of executed jobs between cleaning up job records + def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) self.class.instances << self @performer = performer @@ -74,12 +82,14 @@ @executor_options = DEFAULT_EXECUTOR_OPTIONS.dup if max_threads.present? @executor_options[:max_threads] = max_threads @executor_options[:max_queue] = max_threads end - @executor_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})" + @name = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})" + @executor_options[:name] = name + @cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs) create_executor warm_cache if warm_cache_on_initialize end # Tests whether the scheduler is running. @@ -170,11 +180,18 @@ def task_observer(time, output, thread_error) error = thread_error || (output.is_a?(GoodJob::ExecutionResult) ? output.unhandled_error : nil) GoodJob._on_thread_error(error) if error instrument("finished_job_task", { result: output, error: thread_error, time: time }) - create_task if output + return unless output + + @cleanup_tracker.increment + if @cleanup_tracker.cleanup? + cleanup + else + create_task + end end # Information about the Scheduler # @return [Hash] def stats @@ -208,10 +225,28 @@ observer = lambda do |_time, _output, thread_error| GoodJob._on_thread_error(thread_error) if thread_error create_task # If cache-warming exhausts the threads, ensure there isn't an executable task remaining end future.add_observer(observer, :call) + future.execute + end + # Preload existing runnable and future-scheduled jobs + # @return [void] + def cleanup + @cleanup_tracker.reset + + future = Concurrent::Future.new(args: [self, @performer], executor: executor) do |_thr_scheduler, thr_performer| + Rails.application.executor.wrap do + thr_performer.cleanup + end + end + + observer = lambda do |_time, _output, thread_error| + GoodJob._on_thread_error(thread_error) if thread_error + create_task + end + future.add_observer(observer, :call) future.execute end private