lib/good_job/scheduler.rb in good_job-1.2.5 vs lib/good_job/scheduler.rb in good_job-1.2.6
- old
+ new
@@ -12,30 +12,30 @@
# The scheduler is responsible for calling its performer efficiently across threads managed by an instance of +Concurrent::ThreadPoolExecutor+.
# If a performer does not have work, the thread will go to sleep.
# The scheduler maintains an instance of +Concurrent::TimerTask+, which wakes sleeping threads and causes them to check whether the performer has new work.
#
class Scheduler
- # Defaults for instance of Concurrent::TimerTask.
- # The timer controls how and when sleeping threads check for new work.
- DEFAULT_TIMER_OPTIONS = {
- execution_interval: 1,
- timeout_interval: 1,
- run_now: true,
- }.freeze
-
# Defaults for instance of Concurrent::ThreadPoolExecutor
# The thread pool is where work is performed.
DEFAULT_POOL_OPTIONS = {
name: name,
min_threads: 0,
- max_threads: Concurrent.processor_count,
+ max_threads: Configuration::DEFAULT_MAX_THREADS,
auto_terminate: true,
idletime: 60,
max_queue: -1,
fallback_policy: :discard,
}.freeze
+ # Defaults for instance of Concurrent::TimerTask.
+ # The timer controls how and when sleeping threads check for new work.
+ DEFAULT_TIMER_OPTIONS = {
+ execution_interval: Configuration::DEFAULT_POLL_INTERVAL,
+ timeout_interval: 1,
+ run_now: true,
+ }.freeze
+
# @!attribute [r] instances
# @!scope class
# List of all instantiated Schedulers in the current process.
# @return [array<GoodJob:Scheduler>]
cattr_reader :instances, default: [], instance_reader: false
@@ -60,39 +60,34 @@
true
end
end
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter)
- timer_options = {}
- timer_options[:execution_interval] = configuration.poll_interval
-
- pool_options = {
- max_threads: max_threads,
- }
-
- GoodJob::Scheduler.new(job_performer, timer_options: timer_options, pool_options: pool_options)
+ GoodJob::Scheduler.new(job_performer, max_threads: max_threads, poll_interval: configuration.poll_interval)
end
if schedulers.size > 1
GoodJob::MultiScheduler.new(schedulers)
else
schedulers.first
end
end
# @param performer [GoodJob::Performer]
- # @param timer_options [Hash] Options to instantiate a Concurrent::TimerTask
- # @param pool_options [Hash] Options to instantiate a Concurrent::ThreadPoolExecutor
- def initialize(performer, timer_options: {}, pool_options: {})
- # TODO: Replace `timer_options` and `pool_options` with only `poll_interval` and `max_threads`
+ # @param max_threads [Numeric, nil] the number of execution threads to use
+ # @param poll_interval [Numeric, nil] the number of seconds between polls for jobs
+ def initialize(performer, max_threads: nil, poll_interval: nil)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
self.class.instances << self
@performer = performer
- @pool_options = DEFAULT_POOL_OPTIONS.merge(pool_options)
- @timer_options = DEFAULT_TIMER_OPTIONS.merge(timer_options)
+ @timer_options = DEFAULT_TIMER_OPTIONS.dup
+ @timer_options[:execution_interval] = poll_interval if poll_interval.present?
+
+ @pool_options = DEFAULT_POOL_OPTIONS.dup
+ @pool_options[:max_threads] = max_threads if max_threads.present?
@pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]} poll_interval=#{@timer_options[:execution_interval]})"
create_pools
end