lib/tobox/pool/threaded_pool.rb in tobox-0.6.1 vs lib/tobox/pool/threaded_pool.rb in tobox-0.7.0
- old
+ new
@@ -1,22 +1,20 @@
# frozen_string_literal: true
-require "monitor"
-
module Tobox
class ThreadedPool < Pool
def initialize(_configuration)
@parent_thread = Thread.main
@threads = []
- @threads.extend(MonitorMixin)
+ @threads_mutex = Thread::Mutex.new
super
end
def start
@workers.each do |wk|
th = start_thread_worker(wk)
- @threads.synchronize do
+ @threads_mutex.synchronize do
@threads << th
end
end
end
@@ -30,11 +28,11 @@
# soft exit
join = lambda do |timeout|
start = Process.clock_gettime(::Process::CLOCK_MONOTONIC)
loop do
- terminating_th = @threads.synchronize { @threads.first }
+ terminating_th = @threads_mutex.synchronize { @threads.first }
return unless terminating_th
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
@@ -45,13 +43,13 @@
end
join.call(shutdown_timeout)
# hard exit
- @threads.synchronize { @threads.each { |th| th.raise(KillError) } }
+ @threads_mutex.synchronize { @threads.each { |th| th.raise(KillError) } }
join.call(grace_shutdown_timeout)
- @threads.synchronize { @threads.each(&:kill) }
+ @threads_mutex.synchronize { @threads.each(&:kill) }
join.call(1)
end
private
@@ -59,18 +57,18 @@
Thread.start(wrk) do |worker|
Thread.current.name = worker.label
do_work(worker)
- @threads.synchronize do
+ @threads_mutex.synchronize do
@threads.delete(Thread.current)
if worker.finished? && @running
idx = @workers.index(worker)
raise Error, "worker not found" unless idx
- subst_worker = Worker.new(worker.label, @configuration)
+ subst_worker = @configuration.worker_class.new(worker.label, @configuration)
@workers[idx] = subst_worker
subst_thread = start_thread_worker(subst_worker)
@threads << subst_thread
end
end