lib/polyphony/core/supervisor.rb in polyphony-0.22 vs lib/polyphony/core/supervisor.rb in polyphony-0.23

- old
+ new

@@ -7,79 +7,113 @@ # Implements a supervision mechanism for controlling multiple coprocesses class Supervisor def initialize @coprocesses = [] - @pending = [] + @pending = {} end def await(&block) + @mode = :await @supervisor_fiber = Fiber.current block&.(self) suspend @coprocesses.map(&:result) rescue Exceptions::MoveOn => e e.value ensure finalize_await end + alias_method :join, :await + def select(&block) + @mode = :select + @select_coproc = nil + @supervisor_fiber = Fiber.current + block&.(self) + suspend + [@select_coproc.result, @select_coproc] + rescue Exceptions::MoveOn => e + e.value + ensure + finalize_select + end + def finalize_await if still_running? stop_all_tasks suspend else @supervisor_fiber = nil end end + def finalize_select + stop_all_tasks if still_running? + @supervisor_fiber = nil + end + def spin(coproc = nil, &block) coproc = Coprocess.new(&(coproc || block)) unless coproc.is_a?(Coprocess) @coprocesses << coproc - @pending << coproc + @pending[coproc] = true coproc.when_done { task_completed(coproc) } coproc.run unless coproc.alive? coproc end def add(coproc) @coprocesses << coproc - @pending << coproc + @pending[coproc] = true coproc.when_done { task_completed(coproc) } coproc.run unless coproc.alive? coproc end + alias_method :<<, :add def still_running? - !@coprocesses.empty? + !@pending.empty? end - def stop!(result = nil) + def interrupt(result = nil) return unless @supervisor_fiber && !@stopped @stopped = true - @supervisor_fiber.transfer Exceptions::MoveOn.new(nil, result) + @supervisor_fiber.schedule Exceptions::MoveOn.new(nil, result) end + alias_method :stop, :interrupt def stop_all_tasks exception = Exceptions::MoveOn.new - @pending.each do |c| - c.transfer(exception) + @pending.each_key do |c| + c.schedule(exception) end end def task_completed(coprocess) - return unless @pending.include?(coprocess) + return unless @pending[coprocess] @pending.delete(coprocess) - @supervisor_fiber&.transfer if @pending.empty? + return unless @pending.empty? || (@mode == :select && !@select_coproc) + + @select_coproc = coprocess if @mode == :select + @supervisor_fiber&.schedule end end # Extension for Coprocess class class Coprocess - def self.await(*coprocs) - supervise do |s| - coprocs.each { |cp| s.add cp } + class << self + def await(*coprocs) + supervisor = Supervisor.new + coprocs.each { |cp| supervisor << cp } + supervisor.await + end + alias_method :join, :await + + def select(*coprocs) + supervisor = Supervisor.new + coprocs.each { |cp| supervisor << cp } + supervisor.select end end end