Sha256: 4703f8d97d504f09174e9fcf0817a147d4eaaed87fa0225ee89a9e3f6c2c73b4
Contents?: true
Size: 1.89 KB
Versions: 1
Compression:
Stored size: 1.89 KB
Contents
# frozen_string_literal: true require "timeout" require "async/scheduler" module Tobox class FiberPool < Pool def initialize(_) Sequel.extension(:fiber_concurrency) super @fibers = [] @fiber_mtx = Mutex.new @fiber_cond = ConditionVariable.new @fiber_thread = nil end def start @fiber_thread = Thread.start do Thread.current.name = "tobox-fibers-thread" begin Fiber.set_scheduler(Async::Scheduler.new) @fiber_mtx.synchronize do @workers.each do |worker| @fibers << start_fiber_worker(worker) end @fiber_cond.signal end rescue KillError @fibers.each { |f| f.raise(KillError) } end end @fiber_mtx.synchronize do @fiber_cond.wait(@fiber_mtx) end end def stop shutdown_timeout = @configuration[:shutdown_timeout] grace_shutdown_timeout = @configuration[:grace_shutdown_timeout] super th = @fiber_thread return unless th th.join(shutdown_timeout) return unless th.alive? th.raise(KillError) th.join(grace_shutdown_timeout) th.kill th.join(1) end private def handle_exception(wrk, exc) # noop return if exc.is_a?(::Async::Stop) super end def start_fiber_worker(worker) Fiber.schedule do do_work(worker) @fiber_mtx.synchronize do @fibers.delete(Fiber.current) if worker.finished? && @running idx = @workers.index(worker) raise Error, "worker not found" unless idx subst_worker = @configuration.worker_class.new(worker.label, @configuration) @workers[idx] = subst_worker @fibers << start_fiber_worker(subst_worker) end end rescue KillError # noop end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
tobox-0.7.0 | lib/tobox/pool/fiber_pool.rb |