lib/tobox/pool/fiber_pool.rb in tobox-0.5.0 vs lib/tobox/pool/fiber_pool.rb in tobox-0.5.1

- old
+ new

@@ -3,37 +3,77 @@ require "timeout" require "fiber_scheduler" module Tobox class FiberPool < Pool - def initialize(_configuration) + def initialize(_) Sequel.extension(:fiber_concurrency) super + @fibers = [] + + @fiber_mtx = Mutex.new + @fiber_cond = ConditionVariable.new + @fiber_thread = nil end def start @fiber_thread = Thread.start do Thread.current.name = "tobox-fibers-thread" - FiberScheduler do - @workers.each_with_index do |wk, _idx| - Fiber.schedule { do_work(wk) } + begin + FiberScheduler do + @fiber_mtx.synchronize do + @workers.each do |worker| + @fibers << start_fiber_worker(worker) + end + @fiber_cond.signal + end end + rescue KillError + @fibers.each { |f| f.raise(KillError) } end end + @fiber_mtx.synchronize do + @fiber_cond.wait(@fiber_mtx) + end end def stop shutdown_timeout = @configuration[:shutdown_timeout] + grace_shutdown_timeout = @configuration[:grace_shutdown_timeout] super - begin - Timeout.timeout(shutdown_timeout) { @fiber_thread.value } - rescue Timeout::Error - # hard exit - @fiber_thread.raise(KillError) - @fiber_thread.value + @fiber_thread.join(shutdown_timeout) + + return unless @fiber_thread.alive? + + @fiber_thread.raise(KillError) + @fiber_thread.join(grace_shutdown_timeout) + @fiber_thread.kill + @fiber_thread.join(1) + end + + private + + def start_fiber_worker(worker) + Fiber.schedule do + do_work(worker) + + @fiber_mtx.synchronize do + @fibers.delete(Fiber.current) + + if worker.finished? && @running + idx = @workers.index(worker) + + raise Error, "worker not found" unless idx + + subst_worker = Worker.new(worker.label, @configuration) + @workers[idx] = subst_worker + subst_fiber = start_fiber_worker(subst_worker) + @fiber_mtx.synchronize { @fibers << subst_fiber } + end + end end end end end