lib/polyphony/core/supervisor.rb in polyphony-0.24 vs lib/polyphony/core/supervisor.rb in polyphony-0.25

- old
+ new

@@ -1,39 +1,39 @@ # frozen_string_literal: true export_default :Supervisor -Coprocess = import('./coprocess') -Exceptions = import('./exceptions') +import '../extensions/fiber' +Exceptions = import './exceptions' -# Implements a supervision mechanism for controlling multiple coprocesses +# Implements a supervision mechanism for controlling multiple fibers class Supervisor def initialize - @coprocesses = [] + @fibers = [] @pending = {} end def await(&block) @mode = :await @supervisor_fiber = Fiber.current block&.(self) suspend - @coprocesses.map(&:result) + @fibers.map(&:result) rescue Exceptions::MoveOn => e e.value ensure finalize_await end alias_method :join, :await def select(&block) @mode = :select - @select_coproc = nil + @select_fiber = nil @supervisor_fiber = Fiber.current block&.(self) suspend - [@select_coproc.result, @select_coproc] + [@select_fiber.result, @select_fiber] rescue Exceptions::MoveOn => e e.value ensure finalize_select end @@ -50,25 +50,19 @@ def finalize_select stop_all_tasks if still_running? @supervisor_fiber = nil end - def spin(coproc = nil, &block) - coproc = Coprocess.new(&(coproc || block)) unless coproc.is_a?(Coprocess) - @coprocesses << coproc - @pending[coproc] = true - coproc.when_done { task_completed(coproc) } - coproc.run unless coproc.alive? - coproc + def spin(orig_caller = caller, &block) + add Fiber.spin(orig_caller, &block) end - def add(coproc) - @coprocesses << coproc - @pending[coproc] = true - coproc.when_done { task_completed(coproc) } - coproc.run unless coproc.alive? - coproc + def add(fiber) + @fibers << fiber + @pending[fiber] = true + fiber.when_done { task_completed(fiber) } + fiber end alias_method :<<, :add def still_running? !@pending.empty? @@ -87,33 +81,33 @@ @pending.each_key do |c| c.schedule(exception) end end - def task_completed(coprocess) - return unless @pending[coprocess] + def task_completed(fiber) + return unless @pending[fiber] - @pending.delete(coprocess) - return unless @pending.empty? || (@mode == :select && !@select_coproc) + @pending.delete(fiber) + return unless @pending.empty? || (@mode == :select && !@select_fiber) - @select_coproc = coprocess if @mode == :select + @select_fiber = fiber if @mode == :select @supervisor_fiber&.schedule end end -# Extension for Coprocess class -class Coprocess +# Supervision extensions for Fiber class +class ::Fiber class << self - def await(*coprocs) + def await(*fibers) supervisor = Supervisor.new - coprocs.each { |cp| supervisor << cp } + fibers.each { |f| supervisor << f } supervisor.await end alias_method :join, :await - def select(*coprocs) + def select(*fibers) supervisor = Supervisor.new - coprocs.each { |cp| supervisor << cp } + fibers.each { |f| supervisor << f } supervisor.select end end end