Sha256: 07f2a2e9da597b975fb313ef8bedd754a2fd9fa65de8ce7cbb1bca0f3688de9e
Contents?: true
Size: 1.81 KB
Versions: 4
Compression:
Stored size: 1.81 KB
Contents
# frozen_string_literal: true require "timeout" require "fiber_scheduler" module Tobox class FiberPool < Pool def initialize(_) Sequel.extension(:fiber_concurrency) super @fibers = [] @fiber_mtx = Mutex.new @fiber_cond = ConditionVariable.new @fiber_thread = nil end def start @fiber_thread = Thread.start do Thread.current.name = "tobox-fibers-thread" begin FiberScheduler do @fiber_mtx.synchronize do @workers.each do |worker| @fibers << start_fiber_worker(worker) end @fiber_cond.signal end end rescue KillError @fibers.each { |f| f.raise(KillError) } end end @fiber_mtx.synchronize do @fiber_cond.wait(@fiber_mtx) end end def stop shutdown_timeout = @configuration[:shutdown_timeout] grace_shutdown_timeout = @configuration[:grace_shutdown_timeout] super @fiber_thread.join(shutdown_timeout) return unless @fiber_thread.alive? @fiber_thread.raise(KillError) @fiber_thread.join(grace_shutdown_timeout) @fiber_thread.kill @fiber_thread.join(1) end private def start_fiber_worker(worker) Fiber.schedule do do_work(worker) @fiber_mtx.synchronize do @fibers.delete(Fiber.current) if worker.finished? && @running idx = @workers.index(worker) raise Error, "worker not found" unless idx subst_worker = Worker.new(worker.label, @configuration) @workers[idx] = subst_worker subst_fiber = start_fiber_worker(subst_worker) @fiber_mtx.synchronize { @fibers << subst_fiber } end end end end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
tobox-0.6.1 | lib/tobox/pool/fiber_pool.rb |
tobox-0.6.0 | lib/tobox/pool/fiber_pool.rb |
tobox-0.5.2 | lib/tobox/pool/fiber_pool.rb |
tobox-0.5.1 | lib/tobox/pool/fiber_pool.rb |