lib/good_job/scheduler.rb in good_job-3.21.0 vs lib/good_job/scheduler.rb in good_job-3.21.1
- old
+ new
@@ -2,11 +2,10 @@
require "concurrent/executor/thread_pool_executor"
require "concurrent/executor/timer_set"
require "concurrent/scheduled_task"
require "concurrent/utility/processor_counter"
-require 'good_job/metrics'
module GoodJob # :nodoc:
#
# Schedulers are generic thread pools that are responsible for
# periodically checking for available tasks, executing tasks within a thread,
@@ -34,37 +33,10 @@
# @!scope class
# List of all instantiated Schedulers in the current process.
# @return [Array<GoodJob::Scheduler>, nil]
cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false
- # Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
- # @param configuration [GoodJob::Configuration]
- # @param warm_cache_on_initialize [Boolean]
- # @return [GoodJob::Scheduler, GoodJob::MultiScheduler]
- def self.from_configuration(configuration, warm_cache_on_initialize: false)
- schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
- queue_string, max_threads = queue_string_and_max_threads.split(':')
- max_threads = (max_threads || configuration.max_threads).to_i
-
- job_performer = GoodJob::JobPerformer.new(queue_string)
- GoodJob::Scheduler.new(
- job_performer,
- max_threads: max_threads,
- max_cache: configuration.max_cache,
- warm_cache_on_initialize: warm_cache_on_initialize,
- cleanup_interval_seconds: configuration.cleanup_interval_seconds,
- cleanup_interval_jobs: configuration.cleanup_interval_jobs
- )
- end
-
- if schedulers.size > 1
- GoodJob::MultiScheduler.new(schedulers)
- else
- schedulers.first
- end
- end
-
# Human readable name of the scheduler that includes configuration values.
# @return [String]
attr_reader :name
# @param performer [GoodJob::JobPerformer]
@@ -86,11 +58,10 @@
end
@name = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})"
@executor_options[:name] = name
@cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs)
- @metrics = ::GoodJob::Metrics.new
@executor_options[:name] = name
create_executor
warm_cache if warm_cache_on_initialize
self.class.instances << self
@@ -141,29 +112,30 @@
def restart(timeout: -1)
raise ArgumentError, "Scheduler#restart cannot be called with a timeout of nil" if timeout.nil?
instrument("scheduler_restart_pools") do
shutdown(timeout: timeout)
- @metrics.reset
+ @performer.reset_stats
create_executor
warm_cache
end
end
# Wakes a thread to allow the performer to execute a task.
# @param state [Hash, nil] Contextual information for the performer. See {JobPerformer#next?}.
# @return [Boolean, nil] Whether work was started.
- #
# * +nil+ if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity.
# * +true+ if the performer started executing work.
# * +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it.
def create_thread(state = nil)
return nil unless executor.running?
- if state
+ if state.present?
return false unless performer.next?(state)
+ fanout = state&.fetch(:fanout, nil)
+
if state[:count]
# When given state for multiple jobs, try to create a thread for each one.
# Return true if a thread can be created for all of them, nil if partial or none.
state_without_count = state.without(:count)
@@ -191,11 +163,11 @@
return nil unless executor.ready_worker_count.positive?
elsif @max_cache.positive?
return nil unless remaining_cache_count.positive?
end
- create_task(delay)
+ create_task(delay, fanout: fanout)
run_now ? true : nil
end
# Invoked on completion of ThreadPoolExecutor task
@@ -205,20 +177,10 @@
result = output.is_a?(GoodJob::ExecutionResult) ? output : nil
unhandled_error = thread_error || result&.unhandled_error
GoodJob._on_thread_error(unhandled_error) if unhandled_error
- if unhandled_error || result&.handled_error
- @metrics.increment_errored_executions
- elsif result&.unexecutable
- @metrics.increment_unexecutable_executions
- elsif result
- @metrics.increment_succeeded_executions
- else
- @metrics.increment_empty_executions
- end
-
instrument("finished_job_task", { result: output, error: thread_error, time: time })
return unless output
@cleanup_tracker.increment
if @cleanup_tracker.cleanup?
@@ -230,20 +192,21 @@
# Information about the Scheduler
# @return [Hash]
def stats
available_threads = executor.ready_worker_count
+
{
name: name,
queues: performer.name,
max_threads: @executor_options[:max_threads],
active_threads: @executor_options[:max_threads] - available_threads,
available_threads: available_threads,
max_cache: @max_cache,
active_cache: cache_count,
available_cache: remaining_cache_count,
- }.merge!(@metrics.to_h)
+ }.merge!(@performer.stats.without(:name))
end
# Preload existing runnable and future-scheduled jobs
# @return [void]
def warm_cache
@@ -298,15 +261,18 @@
@executor = ThreadPoolExecutor.new(@executor_options)
end
end
# @param delay [Integer]
+ # @param fanout [Boolean] Whether to eagerly create a 2nd execution thread if a job is found.
# @return [void]
- def create_task(delay = 0)
- future = Concurrent::ScheduledTask.new(delay, args: [performer], executor: executor, timer_set: timer_set) do |thr_performer|
+ def create_task(delay = 0, fanout: false)
+ future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer|
Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name
Rails.application.reloader.wrap do
- thr_performer.next
+ thr_performer.next do |found|
+ thr_scheduler.create_thread({ fanout: fanout }) if found && fanout
+ end
end
end
future.add_observer(self, :task_observer)
future.execute
end