lib/good_job/scheduler.rb in good_job-3.15.14 vs lib/good_job/scheduler.rb in good_job-3.16.0
- old
+ new
@@ -1,10 +1,12 @@
# frozen_string_literal: true
+
require "concurrent/executor/thread_pool_executor"
require "concurrent/executor/timer_set"
require "concurrent/scheduled_task"
require "concurrent/utility/processor_counter"
+require 'good_job/metrics'
module GoodJob # :nodoc:
#
# Schedulers are generic thread pools that are responsible for
# periodically checking for available tasks, executing tasks within a thread,
@@ -30,11 +32,11 @@
# @!attribute [r] instances
# @!scope class
# List of all instantiated Schedulers in the current process.
# @return [Array<GoodJob::Scheduler>, nil]
- cattr_reader :instances, default: [], instance_reader: false
+ cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false
# Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
# @param configuration [GoodJob::Configuration]
# @param warm_cache_on_initialize [Boolean]
# @return [GoodJob::Scheduler, GoodJob::MultiScheduler]
@@ -72,12 +74,10 @@
# @param cleanup_interval_seconds [Numeric, nil] number of seconds between cleaning up job records
# @param cleanup_interval_jobs [Numeric, nil] number of executed jobs between cleaning up job records
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
- self.class.instances << self
-
@performer = performer
@max_cache = max_cache || 0
@executor_options = DEFAULT_EXECUTOR_OPTIONS.dup
if max_threads.present?
@@ -86,12 +86,16 @@
end
@name = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})"
@executor_options[:name] = name
@cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs)
+ @metrics = ::GoodJob::Metrics.new
+ @executor_options[:name] = name
+
create_executor
warm_cache if warm_cache_on_initialize
+ self.class.instances << self
end
# Tests whether the scheduler is running.
# @return [Boolean, nil]
delegate :running?, to: :executor, allow_nil: true
@@ -137,10 +141,11 @@
def restart(timeout: -1)
raise ArgumentError, "Scheduler#restart cannot be called with a timeout of nil" if timeout.nil?
instrument("scheduler_restart_pools") do
shutdown(timeout: timeout)
+ @metrics.reset
create_executor
warm_cache
end
end
@@ -195,13 +200,25 @@
# Invoked on completion of ThreadPoolExecutor task
# @!visibility private
# @return [void]
def task_observer(time, output, thread_error)
- error = thread_error || (output.is_a?(GoodJob::ExecutionResult) ? output.unhandled_error : nil)
- GoodJob._on_thread_error(error) if error
+ result = output.is_a?(GoodJob::ExecutionResult) ? output : nil
+ unhandled_error = thread_error || result&.unhandled_error
+ GoodJob._on_thread_error(unhandled_error) if unhandled_error
+
+ if unhandled_error || result&.handled_error
+ @metrics.increment_errored_executions
+ elsif result&.unexecutable
+ @metrics.increment_unexecutable_executions
+ elsif result
+ @metrics.increment_succeeded_executions
+ else
+ @metrics.increment_empty_executions
+ end
+
instrument("finished_job_task", { result: output, error: thread_error, time: time })
return unless output
@cleanup_tracker.increment
if @cleanup_tracker.cleanup?
@@ -212,19 +229,21 @@
end
# Information about the Scheduler
# @return [Hash]
def stats
+ available_threads = executor.ready_worker_count
{
- name: performer.name,
+ name: name,
+ queues: performer.name,
max_threads: @executor_options[:max_threads],
- active_threads: @executor_options[:max_threads] - executor.ready_worker_count,
- available_threads: executor.ready_worker_count,
+ active_threads: @executor_options[:max_threads] - available_threads,
+ available_threads: available_threads,
max_cache: @max_cache,
active_cache: cache_count,
available_cache: remaining_cache_count,
- }
+ }.merge!(@metrics.to_h)
end
# Preload existing runnable and future-scheduled jobs
# @return [void]
def warm_cache
@@ -314,11 +333,11 @@
@max_cache - cache_count
end
# Custom sub-class of +Concurrent::ThreadPoolExecutor+ to add additional worker status.
# @private
- class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor
+ class ThreadPoolExecutor < ::Concurrent::ThreadPoolExecutor
# Number of inactive threads available to execute tasks.
# https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437
# @return [Integer]
def ready_worker_count
synchronize do
@@ -333,10 +352,10 @@
end
end
# Custom sub-class of +Concurrent::TimerSet+ for additional behavior.
# @private
- class TimerSet < Concurrent::TimerSet
+ class TimerSet < ::Concurrent::TimerSet
# Number of scheduled jobs in the queue
# @return [Integer]
def length
@queue.length
end