lib/good_job/scheduler.rb in good_job-2.7.4 vs lib/good_job/scheduler.rb in good_job-2.8.0
- old
+ new
@@ -46,26 +46,34 @@
job_performer = GoodJob::JobPerformer.new(queue_string)
GoodJob::Scheduler.new(
job_performer,
max_threads: max_threads,
max_cache: configuration.max_cache,
- warm_cache_on_initialize: warm_cache_on_initialize
+ warm_cache_on_initialize: warm_cache_on_initialize,
+ cleanup_interval_seconds: configuration.cleanup_interval_seconds,
+ cleanup_interval_jobs: configuration.cleanup_interval_jobs
)
end
if schedulers.size > 1
GoodJob::MultiScheduler.new(schedulers)
else
schedulers.first
end
end
+ # Human readable name of the scheduler that includes configuration values.
+ # @return [String]
+ attr_reader :name
+
# @param performer [GoodJob::JobPerformer]
# @param max_threads [Numeric, nil] number of seconds between polls for jobs
# @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory
# @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling +warm_cache+
- def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false)
+ # @param cleanup_interval_seconds [Numeric, nil] number of seconds between cleaning up job records
+ # @param cleanup_interval_jobs [Numeric, nil] number of executed jobs between cleaning up job records
+ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
self.class.instances << self
@performer = performer
@@ -74,12 +82,14 @@
@executor_options = DEFAULT_EXECUTOR_OPTIONS.dup
if max_threads.present?
@executor_options[:max_threads] = max_threads
@executor_options[:max_queue] = max_threads
end
- @executor_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})"
+ @name = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})"
+ @executor_options[:name] = name
+ @cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs)
create_executor
warm_cache if warm_cache_on_initialize
end
# Tests whether the scheduler is running.
@@ -170,11 +180,18 @@
def task_observer(time, output, thread_error)
error = thread_error || (output.is_a?(GoodJob::ExecutionResult) ? output.unhandled_error : nil)
GoodJob._on_thread_error(error) if error
instrument("finished_job_task", { result: output, error: thread_error, time: time })
- create_task if output
+ return unless output
+
+ @cleanup_tracker.increment
+ if @cleanup_tracker.cleanup?
+ cleanup
+ else
+ create_task
+ end
end
# Information about the Scheduler
# @return [Hash]
def stats
@@ -208,10 +225,28 @@
observer = lambda do |_time, _output, thread_error|
GoodJob._on_thread_error(thread_error) if thread_error
create_task # If cache-warming exhausts the threads, ensure there isn't an executable task remaining
end
future.add_observer(observer, :call)
+ future.execute
+ end
+ # Preload existing runnable and future-scheduled jobs
+ # @return [void]
+ def cleanup
+ @cleanup_tracker.reset
+
+ future = Concurrent::Future.new(args: [self, @performer], executor: executor) do |_thr_scheduler, thr_performer|
+ Rails.application.executor.wrap do
+ thr_performer.cleanup
+ end
+ end
+
+ observer = lambda do |_time, _output, thread_error|
+ GoodJob._on_thread_error(thread_error) if thread_error
+ create_task
+ end
+ future.add_observer(observer, :call)
future.execute
end
private