lib/good_job/scheduler.rb in good_job-1.2.5 vs lib/good_job/scheduler.rb in good_job-1.2.6

- old
+ new

@@ -12,30 +12,30 @@ # The scheduler is responsible for calling its performer efficiently across threads managed by an instance of +Concurrent::ThreadPoolExecutor+. # If a performer does not have work, the thread will go to sleep. # The scheduler maintains an instance of +Concurrent::TimerTask+, which wakes sleeping threads and causes them to check whether the performer has new work. # class Scheduler - # Defaults for instance of Concurrent::TimerTask. - # The timer controls how and when sleeping threads check for new work. - DEFAULT_TIMER_OPTIONS = { - execution_interval: 1, - timeout_interval: 1, - run_now: true, - }.freeze - # Defaults for instance of Concurrent::ThreadPoolExecutor # The thread pool is where work is performed. DEFAULT_POOL_OPTIONS = { name: name, min_threads: 0, - max_threads: Concurrent.processor_count, + max_threads: Configuration::DEFAULT_MAX_THREADS, auto_terminate: true, idletime: 60, max_queue: -1, fallback_policy: :discard, }.freeze + # Defaults for instance of Concurrent::TimerTask. + # The timer controls how and when sleeping threads check for new work. + DEFAULT_TIMER_OPTIONS = { + execution_interval: Configuration::DEFAULT_POLL_INTERVAL, + timeout_interval: 1, + run_now: true, + }.freeze + # @!attribute [r] instances # @!scope class # List of all instantiated Schedulers in the current process. # @return [array<GoodJob:Scheduler>] cattr_reader :instances, default: [], instance_reader: false @@ -60,39 +60,34 @@ true end end job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter) - timer_options = {} - timer_options[:execution_interval] = configuration.poll_interval - - pool_options = { - max_threads: max_threads, - } - - GoodJob::Scheduler.new(job_performer, timer_options: timer_options, pool_options: pool_options) + GoodJob::Scheduler.new(job_performer, max_threads: max_threads, poll_interval: configuration.poll_interval) end if schedulers.size > 1 GoodJob::MultiScheduler.new(schedulers) else schedulers.first end end # @param performer [GoodJob::Performer] - # @param timer_options [Hash] Options to instantiate a Concurrent::TimerTask - # @param pool_options [Hash] Options to instantiate a Concurrent::ThreadPoolExecutor - def initialize(performer, timer_options: {}, pool_options: {}) - # TODO: Replace `timer_options` and `pool_options` with only `poll_interval` and `max_threads` + # @param max_threads [Numeric, nil] the number of execution threads to use + # @param poll_interval [Numeric, nil] the number of seconds between polls for jobs + def initialize(performer, max_threads: nil, poll_interval: nil) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) self.class.instances << self @performer = performer - @pool_options = DEFAULT_POOL_OPTIONS.merge(pool_options) - @timer_options = DEFAULT_TIMER_OPTIONS.merge(timer_options) + @timer_options = DEFAULT_TIMER_OPTIONS.dup + @timer_options[:execution_interval] = poll_interval if poll_interval.present? + + @pool_options = DEFAULT_POOL_OPTIONS.dup + @pool_options[:max_threads] = max_threads if max_threads.present? @pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]} poll_interval=#{@timer_options[:execution_interval]})" create_pools end