lib/tobox/pool/fiber_pool.rb in tobox-0.6.1 vs lib/tobox/pool/fiber_pool.rb in tobox-0.7.0
- old
+ new
@@ -1,9 +1,9 @@
# frozen_string_literal: true
require "timeout"
-require "fiber_scheduler"
+require "async/scheduler"
module Tobox
class FiberPool < Pool
def initialize(_)
Sequel.extension(:fiber_concurrency)
@@ -18,22 +18,23 @@
def start
@fiber_thread = Thread.start do
Thread.current.name = "tobox-fibers-thread"
begin
- FiberScheduler do
- @fiber_mtx.synchronize do
- @workers.each do |worker|
- @fibers << start_fiber_worker(worker)
- end
- @fiber_cond.signal
+ Fiber.set_scheduler(Async::Scheduler.new)
+
+ @fiber_mtx.synchronize do
+ @workers.each do |worker|
+ @fibers << start_fiber_worker(worker)
end
+ @fiber_cond.signal
end
rescue KillError
@fibers.each { |f| f.raise(KillError) }
end
end
+
@fiber_mtx.synchronize do
@fiber_cond.wait(@fiber_mtx)
end
end
@@ -41,22 +42,33 @@
shutdown_timeout = @configuration[:shutdown_timeout]
grace_shutdown_timeout = @configuration[:grace_shutdown_timeout]
super
- @fiber_thread.join(shutdown_timeout)
+ th = @fiber_thread
- return unless @fiber_thread.alive?
+ return unless th
- @fiber_thread.raise(KillError)
- @fiber_thread.join(grace_shutdown_timeout)
- @fiber_thread.kill
- @fiber_thread.join(1)
+ th.join(shutdown_timeout)
+
+ return unless th.alive?
+
+ th.raise(KillError)
+ th.join(grace_shutdown_timeout)
+ th.kill
+ th.join(1)
end
private
+ def handle_exception(wrk, exc)
+ # noop
+ return if exc.is_a?(::Async::Stop)
+
+ super
+ end
+
def start_fiber_worker(worker)
Fiber.schedule do
do_work(worker)
@fiber_mtx.synchronize do
@@ -65,15 +77,16 @@
if worker.finished? && @running
idx = @workers.index(worker)
raise Error, "worker not found" unless idx
- subst_worker = Worker.new(worker.label, @configuration)
+ subst_worker = @configuration.worker_class.new(worker.label, @configuration)
@workers[idx] = subst_worker
- subst_fiber = start_fiber_worker(subst_worker)
- @fiber_mtx.synchronize { @fibers << subst_fiber }
+ @fibers << start_fiber_worker(subst_worker)
end
end
+ rescue KillError
+ # noop
end
end
end
end