lib/good_job/scheduler.rb in good_job-1.0.3 vs lib/good_job/scheduler.rb in good_job-1.1.0

- old
+ new

@@ -18,46 +18,52 @@ idletime: 60, max_queue: -1, fallback_policy: :discard, }.freeze + cattr_reader :instances, default: [], instance_reader: false + def initialize(performer, timer_options: {}, pool_options: {}) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) + self.class.instances << self + @performer = performer - @pool = ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options)) - @timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do - create_thread - end - @timer.add_observer(self, :timer_observer) - @timer.execute - end + @pool_options = DEFAULT_POOL_OPTIONS.merge(pool_options) + @timer_options = DEFAULT_TIMER_OPTIONS.merge(timer_options) - def execute + create_pools end def shutdown(wait: true) @_shutdown = true - ActiveSupport::Notifications.instrument("scheduler_start_shutdown.good_job", { wait: wait }) - ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait }) do - if @timer.running? + ActiveSupport::Notifications.instrument("scheduler_shutdown_start.good_job", { wait: wait, process_id: process_id }) + ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait, process_id: process_id }) do + if @timer&.running? @timer.shutdown @timer.wait_for_termination if wait end - if @pool.running? + if @pool&.running? @pool.shutdown @pool.wait_for_termination if wait end end end def shutdown? @_shutdown end + def restart(wait: true) + ActiveSupport::Notifications.instrument("scheduler_restart_pools.good_job", { process_id: process_id }) do + shutdown(wait: wait) unless shutdown? + create_pools + end + end + def create_thread return false unless @pool.ready_worker_count.positive? future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| output = nil @@ -67,14 +73,16 @@ future.add_observer(self, :task_observer) future.execute end def timer_observer(time, executed_task, thread_error) + GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call) ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: thread_error, time: time }) end def task_observer(time, output, thread_error) + GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call) ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time }) create_thread if output end class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor @@ -85,8 +93,29 @@ workers_created_but_waiting = @ready.length workers_still_to_be_created + workers_created_but_waiting end end + end + + private + + def create_pools + ActiveSupport::Notifications.instrument("scheduler_create_pools.good_job", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval], process_id: process_id }) do + @pool = ThreadPoolExecutor.new(@pool_options) + next unless @timer_options[:execution_interval].positive? + + @timer = Concurrent::TimerTask.new(@timer_options) { create_thread } + @timer.add_observer(self, :timer_observer) + @timer.execute + end + end + + def process_id + Process.pid + end + + def thread_name + Thread.current.name || Thread.current.object_id end end end