lib/tobox/pool/threaded_pool.rb in tobox-0.1.6 vs lib/tobox/pool/threaded_pool.rb in tobox-0.2.0

- old
+ new

@@ -1,34 +1,24 @@ # frozen_string_literal: true +require "monitor" + module Tobox class ThreadedPool < Pool - class KillError < Interrupt; end - def initialize(_configuration) + @parent_thread = Thread.main @threads = [] + @threads.extend(MonitorMixin) super - @error_handlers = Array(@configuration.lifecycle_events[:error]) end def start - @workers.each_with_index do |wk, idx| - th = Thread.start do - Thread.current.name = "tobox-worker-#{idx}" - - begin - wk.work - rescue KillError - # noop - rescue Exception => e # rubocop:disable Lint/RescueException - @error_handlers.each { |hd| hd.call(:tobox_error, e) } - raise e - end - - @threads.delete(Thread.current) + @workers.each do |wk| + th = start_thread_worker(wk) + @threads.synchronize do + @threads << th end - @threads << th end end def stop shutdown_timeout = @configuration[:shutdown_timeout] @@ -47,9 +37,34 @@ # hard exit @threads.each { |th| th.raise(KillError) } while (th = @threads.pop) th.value # waits + end + end + + private + + def start_thread_worker(wrk) + Thread.start(wrk) do |worker| + Thread.current.name = worker.label + + do_work(worker) + + @threads.synchronize do + @threads.delete(Thread.current) + + if worker.finished? && @running + idx = @workers.index(worker) + + subst_worker = Worker.new(worker.label, @configuration) + @workers[idx] = subst_worker + subst_thread = start_thread_worker(subst_worker) + @threads << subst_thread + end + # all workers went down abruply, we need to kill the process. + # @parent_thread.raise(Interrupt) if wk.finished? && @threads.empty? && @running + end end end end end