lib/tobox/pool/fiber_pool.rb in tobox-0.5.0 vs lib/tobox/pool/fiber_pool.rb in tobox-0.5.1
- old
+ new
@@ -3,37 +3,77 @@
require "timeout"
require "fiber_scheduler"
module Tobox
class FiberPool < Pool
- def initialize(_configuration)
+ def initialize(_)
Sequel.extension(:fiber_concurrency)
super
+ @fibers = []
+
+ @fiber_mtx = Mutex.new
+ @fiber_cond = ConditionVariable.new
+ @fiber_thread = nil
end
def start
@fiber_thread = Thread.start do
Thread.current.name = "tobox-fibers-thread"
- FiberScheduler do
- @workers.each_with_index do |wk, _idx|
- Fiber.schedule { do_work(wk) }
+ begin
+ FiberScheduler do
+ @fiber_mtx.synchronize do
+ @workers.each do |worker|
+ @fibers << start_fiber_worker(worker)
+ end
+ @fiber_cond.signal
+ end
end
+ rescue KillError
+ @fibers.each { |f| f.raise(KillError) }
end
end
+ @fiber_mtx.synchronize do
+ @fiber_cond.wait(@fiber_mtx)
+ end
end
def stop
shutdown_timeout = @configuration[:shutdown_timeout]
+ grace_shutdown_timeout = @configuration[:grace_shutdown_timeout]
super
- begin
- Timeout.timeout(shutdown_timeout) { @fiber_thread.value }
- rescue Timeout::Error
- # hard exit
- @fiber_thread.raise(KillError)
- @fiber_thread.value
+ @fiber_thread.join(shutdown_timeout)
+
+ return unless @fiber_thread.alive?
+
+ @fiber_thread.raise(KillError)
+ @fiber_thread.join(grace_shutdown_timeout)
+ @fiber_thread.kill
+ @fiber_thread.join(1)
+ end
+
+ private
+
+ def start_fiber_worker(worker)
+ Fiber.schedule do
+ do_work(worker)
+
+ @fiber_mtx.synchronize do
+ @fibers.delete(Fiber.current)
+
+ if worker.finished? && @running
+ idx = @workers.index(worker)
+
+ raise Error, "worker not found" unless idx
+
+ subst_worker = Worker.new(worker.label, @configuration)
+ @workers[idx] = subst_worker
+ subst_fiber = start_fiber_worker(subst_worker)
+ @fiber_mtx.synchronize { @fibers << subst_fiber }
+ end
+ end
end
end
end
end