lib/good_job/scheduler.rb in good_job-1.1.3 vs lib/good_job/scheduler.rb in good_job-1.1.4

- old
+ new

@@ -1,27 +1,28 @@ require "concurrent/executor/thread_pool_executor" require "concurrent/timer_task" require "concurrent/utility/processor_counter" module GoodJob # :nodoc: + # # Schedulers are generic thread execution pools that are responsible for # periodically checking for available execution tasks, executing tasks in a # bounded thread-pool, and efficiently scaling execution threads. # # Schedulers are "generic" in the sense that they delegate task execution # details to a "Performer" object that responds to #next. + # class Scheduler # Defaults for instance of Concurrent::TimerTask DEFAULT_TIMER_OPTIONS = { execution_interval: 1, timeout_interval: 1, run_now: true, }.freeze # Defaults for instance of Concurrent::ThreadPoolExecutor DEFAULT_POOL_OPTIONS = { - name: 'good_job', min_threads: 0, max_threads: Concurrent.processor_count, auto_terminate: true, idletime: 60, max_queue: -1, @@ -41,14 +42,24 @@ schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads| queue_string, max_threads = queue_string_and_max_threads.split(':') max_threads = (max_threads || configuration.max_threads).to_i job_query = GoodJob::Job.queue_string(queue_string) - job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string) + parsed = GoodJob::Job.queue_parser(queue_string) + job_filter = proc do |state| + if parsed[:exclude] + !parsed[:exclude].include? state[:queue_name] + elsif parsed[:include] + parsed[:include].include? state[:queue_name] + else + true + end + end + job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter) timer_options = {} - timer_options[:execution_interval] = configuration.poll_interval if configuration.poll_interval.positive? + timer_options[:execution_interval] = configuration.poll_interval pool_options = { max_threads: max_threads, } @@ -72,21 +83,23 @@ @performer = performer @pool_options = DEFAULT_POOL_OPTIONS.merge(pool_options) @timer_options = DEFAULT_TIMER_OPTIONS.merge(timer_options) + @pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]} poll_interval=#{@timer_options[:execution_interval]})" + create_pools end # Shut down the Scheduler. # @param wait [Boolean] Wait for actively executing jobs to finish # @return [void] def shutdown(wait: true) @_shutdown = true - ActiveSupport::Notifications.instrument("scheduler_shutdown_start.good_job", { wait: wait, process_id: process_id }) - ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait, process_id: process_id }) do + instrument("scheduler_shutdown_start", { wait: wait }) + instrument("scheduler_shutdown", { wait: wait }) do if @timer&.running? @timer.shutdown @timer.wait_for_termination if wait end @@ -105,69 +118,76 @@ # Restart the Scheduler. When shutdown, start; or shutdown and start. # @param wait [Boolean] Wait for actively executing jobs to finish # @return [void] def restart(wait: true) - ActiveSupport::Notifications.instrument("scheduler_restart_pools.good_job", { process_id: process_id }) do + instrument("scheduler_restart_pools") do shutdown(wait: wait) unless shutdown? create_pools + @_shutdown = false end end - # Triggers the execution the Performer, if an execution thread is available. - # @return [Boolean] - def create_thread - return false unless @pool.ready_worker_count.positive? + # Triggers a Performer execution, if an execution thread is available. + # @param state [nil, Object] Allows Performer#next? to accept or reject the execution + # @return [nil, Boolean] if the thread was created + def create_thread(state = nil) + return nil unless @pool.running? && @pool.ready_worker_count.positive? + if state + return false unless @performer.next?(state) + end + future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| output = nil Rails.application.executor.wrap { output = performer.next } output end future.add_observer(self, :task_observer) future.execute + true end # Invoked on completion of TimerTask task. # @!visibility private # @return [void] def timer_observer(time, executed_task, thread_error) GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call) - ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: thread_error, time: time }) + instrument("finished_timer_task", { result: executed_task, error: thread_error, time: time }) end # Invoked on completion of ThreadPoolExecutor task # @!visibility private # @return [void] def task_observer(time, output, thread_error) GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call) - ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time }) + instrument("finished_job_task", { result: output, error: thread_error, time: time }) create_thread if output end private # @return [void] def create_pools - ActiveSupport::Notifications.instrument("scheduler_create_pools.good_job", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval], process_id: process_id }) do + instrument("scheduler_create_pools", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval] }) do @pool = ThreadPoolExecutor.new(@pool_options) next unless @timer_options[:execution_interval].positive? @timer = Concurrent::TimerTask.new(@timer_options) { create_thread } @timer.add_observer(self, :timer_observer) @timer.execute end end - # @return [Integer] Current process ID - def process_id - Process.pid - end + def instrument(name, payload = {}, &block) + payload = payload.reverse_merge({ + scheduler: self, + process_id: GoodJob::CurrentExecution.process_id, + thread_name: GoodJob::CurrentExecution.thread_name, + }) - # @return [String] Current thread name - def thread_name - (Thread.current.name || Thread.current.object_id).to_s + ActiveSupport::Notifications.instrument("#{name}.good_job", payload, &block) end end # Slightly customized sub-class of Concurrent::ThreadPoolExecutor class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor