lib/tobox/pool/threaded_pool.rb in tobox-0.1.6 vs lib/tobox/pool/threaded_pool.rb in tobox-0.2.0
- old
+ new
@@ -1,34 +1,24 @@
# frozen_string_literal: true
+require "monitor"
+
module Tobox
class ThreadedPool < Pool
- class KillError < Interrupt; end
-
def initialize(_configuration)
+ @parent_thread = Thread.main
@threads = []
+ @threads.extend(MonitorMixin)
super
- @error_handlers = Array(@configuration.lifecycle_events[:error])
end
def start
- @workers.each_with_index do |wk, idx|
- th = Thread.start do
- Thread.current.name = "tobox-worker-#{idx}"
-
- begin
- wk.work
- rescue KillError
- # noop
- rescue Exception => e # rubocop:disable Lint/RescueException
- @error_handlers.each { |hd| hd.call(:tobox_error, e) }
- raise e
- end
-
- @threads.delete(Thread.current)
+ @workers.each do |wk|
+ th = start_thread_worker(wk)
+ @threads.synchronize do
+ @threads << th
end
- @threads << th
end
end
def stop
shutdown_timeout = @configuration[:shutdown_timeout]
@@ -47,9 +37,34 @@
# hard exit
@threads.each { |th| th.raise(KillError) }
while (th = @threads.pop)
th.value # waits
+ end
+ end
+
+ private
+
+ def start_thread_worker(wrk)
+ Thread.start(wrk) do |worker|
+ Thread.current.name = worker.label
+
+ do_work(worker)
+
+ @threads.synchronize do
+ @threads.delete(Thread.current)
+
+ if worker.finished? && @running
+ idx = @workers.index(worker)
+
+ subst_worker = Worker.new(worker.label, @configuration)
+ @workers[idx] = subst_worker
+ subst_thread = start_thread_worker(subst_worker)
+ @threads << subst_thread
+ end
+ # all workers went down abruply, we need to kill the process.
+ # @parent_thread.raise(Interrupt) if wk.finished? && @threads.empty? && @running
+ end
end
end
end
end