lib/good_job/scheduler.rb in good_job-1.1.3 vs lib/good_job/scheduler.rb in good_job-1.1.4
- old
+ new
@@ -1,27 +1,28 @@
require "concurrent/executor/thread_pool_executor"
require "concurrent/timer_task"
require "concurrent/utility/processor_counter"
module GoodJob # :nodoc:
+ #
# Schedulers are generic thread execution pools that are responsible for
# periodically checking for available execution tasks, executing tasks in a
# bounded thread-pool, and efficiently scaling execution threads.
#
# Schedulers are "generic" in the sense that they delegate task execution
# details to a "Performer" object that responds to #next.
+ #
class Scheduler
# Defaults for instance of Concurrent::TimerTask
DEFAULT_TIMER_OPTIONS = {
execution_interval: 1,
timeout_interval: 1,
run_now: true,
}.freeze
# Defaults for instance of Concurrent::ThreadPoolExecutor
DEFAULT_POOL_OPTIONS = {
- name: 'good_job',
min_threads: 0,
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 60,
max_queue: -1,
@@ -41,14 +42,24 @@
schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
queue_string, max_threads = queue_string_and_max_threads.split(':')
max_threads = (max_threads || configuration.max_threads).to_i
job_query = GoodJob::Job.queue_string(queue_string)
- job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string)
+ parsed = GoodJob::Job.queue_parser(queue_string)
+ job_filter = proc do |state|
+ if parsed[:exclude]
+ !parsed[:exclude].include? state[:queue_name]
+ elsif parsed[:include]
+ parsed[:include].include? state[:queue_name]
+ else
+ true
+ end
+ end
+ job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter)
timer_options = {}
- timer_options[:execution_interval] = configuration.poll_interval if configuration.poll_interval.positive?
+ timer_options[:execution_interval] = configuration.poll_interval
pool_options = {
max_threads: max_threads,
}
@@ -72,21 +83,23 @@
@performer = performer
@pool_options = DEFAULT_POOL_OPTIONS.merge(pool_options)
@timer_options = DEFAULT_TIMER_OPTIONS.merge(timer_options)
+ @pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]} poll_interval=#{@timer_options[:execution_interval]})"
+
create_pools
end
# Shut down the Scheduler.
# @param wait [Boolean] Wait for actively executing jobs to finish
# @return [void]
def shutdown(wait: true)
@_shutdown = true
- ActiveSupport::Notifications.instrument("scheduler_shutdown_start.good_job", { wait: wait, process_id: process_id })
- ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait, process_id: process_id }) do
+ instrument("scheduler_shutdown_start", { wait: wait })
+ instrument("scheduler_shutdown", { wait: wait }) do
if @timer&.running?
@timer.shutdown
@timer.wait_for_termination if wait
end
@@ -105,69 +118,76 @@
# Restart the Scheduler. When shutdown, start; or shutdown and start.
# @param wait [Boolean] Wait for actively executing jobs to finish
# @return [void]
def restart(wait: true)
- ActiveSupport::Notifications.instrument("scheduler_restart_pools.good_job", { process_id: process_id }) do
+ instrument("scheduler_restart_pools") do
shutdown(wait: wait) unless shutdown?
create_pools
+ @_shutdown = false
end
end
- # Triggers the execution the Performer, if an execution thread is available.
- # @return [Boolean]
- def create_thread
- return false unless @pool.ready_worker_count.positive?
+ # Triggers a Performer execution, if an execution thread is available.
+ # @param state [nil, Object] Allows Performer#next? to accept or reject the execution
+ # @return [nil, Boolean] if the thread was created
+ def create_thread(state = nil)
+ return nil unless @pool.running? && @pool.ready_worker_count.positive?
+ if state
+ return false unless @performer.next?(state)
+ end
+
future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
output = nil
Rails.application.executor.wrap { output = performer.next }
output
end
future.add_observer(self, :task_observer)
future.execute
+
true
end
# Invoked on completion of TimerTask task.
# @!visibility private
# @return [void]
def timer_observer(time, executed_task, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
- ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: thread_error, time: time })
+ instrument("finished_timer_task", { result: executed_task, error: thread_error, time: time })
end
# Invoked on completion of ThreadPoolExecutor task
# @!visibility private
# @return [void]
def task_observer(time, output, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
- ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time })
+ instrument("finished_job_task", { result: output, error: thread_error, time: time })
create_thread if output
end
private
# @return [void]
def create_pools
- ActiveSupport::Notifications.instrument("scheduler_create_pools.good_job", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval], process_id: process_id }) do
+ instrument("scheduler_create_pools", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval] }) do
@pool = ThreadPoolExecutor.new(@pool_options)
next unless @timer_options[:execution_interval].positive?
@timer = Concurrent::TimerTask.new(@timer_options) { create_thread }
@timer.add_observer(self, :timer_observer)
@timer.execute
end
end
- # @return [Integer] Current process ID
- def process_id
- Process.pid
- end
+ def instrument(name, payload = {}, &block)
+ payload = payload.reverse_merge({
+ scheduler: self,
+ process_id: GoodJob::CurrentExecution.process_id,
+ thread_name: GoodJob::CurrentExecution.thread_name,
+ })
- # @return [String] Current thread name
- def thread_name
- (Thread.current.name || Thread.current.object_id).to_s
+ ActiveSupport::Notifications.instrument("#{name}.good_job", payload, &block)
end
end
# Slightly customized sub-class of Concurrent::ThreadPoolExecutor
class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor