lib/polyphony/core/supervisor.rb in polyphony-0.22 vs lib/polyphony/core/supervisor.rb in polyphony-0.23
- old
+ new
@@ -7,79 +7,113 @@
# Implements a supervision mechanism for controlling multiple coprocesses
class Supervisor
def initialize
@coprocesses = []
- @pending = []
+ @pending = {}
end
def await(&block)
+ @mode = :await
@supervisor_fiber = Fiber.current
block&.(self)
suspend
@coprocesses.map(&:result)
rescue Exceptions::MoveOn => e
e.value
ensure
finalize_await
end
+ alias_method :join, :await
+ def select(&block)
+ @mode = :select
+ @select_coproc = nil
+ @supervisor_fiber = Fiber.current
+ block&.(self)
+ suspend
+ [@select_coproc.result, @select_coproc]
+ rescue Exceptions::MoveOn => e
+ e.value
+ ensure
+ finalize_select
+ end
+
def finalize_await
if still_running?
stop_all_tasks
suspend
else
@supervisor_fiber = nil
end
end
+ def finalize_select
+ stop_all_tasks if still_running?
+ @supervisor_fiber = nil
+ end
+
def spin(coproc = nil, &block)
coproc = Coprocess.new(&(coproc || block)) unless coproc.is_a?(Coprocess)
@coprocesses << coproc
- @pending << coproc
+ @pending[coproc] = true
coproc.when_done { task_completed(coproc) }
coproc.run unless coproc.alive?
coproc
end
def add(coproc)
@coprocesses << coproc
- @pending << coproc
+ @pending[coproc] = true
coproc.when_done { task_completed(coproc) }
coproc.run unless coproc.alive?
coproc
end
+ alias_method :<<, :add
def still_running?
- !@coprocesses.empty?
+ !@pending.empty?
end
- def stop!(result = nil)
+ def interrupt(result = nil)
return unless @supervisor_fiber && !@stopped
@stopped = true
- @supervisor_fiber.transfer Exceptions::MoveOn.new(nil, result)
+ @supervisor_fiber.schedule Exceptions::MoveOn.new(nil, result)
end
+ alias_method :stop, :interrupt
def stop_all_tasks
exception = Exceptions::MoveOn.new
- @pending.each do |c|
- c.transfer(exception)
+ @pending.each_key do |c|
+ c.schedule(exception)
end
end
def task_completed(coprocess)
- return unless @pending.include?(coprocess)
+ return unless @pending[coprocess]
@pending.delete(coprocess)
- @supervisor_fiber&.transfer if @pending.empty?
+ return unless @pending.empty? || (@mode == :select && !@select_coproc)
+
+ @select_coproc = coprocess if @mode == :select
+ @supervisor_fiber&.schedule
end
end
# Extension for Coprocess class
class Coprocess
- def self.await(*coprocs)
- supervise do |s|
- coprocs.each { |cp| s.add cp }
+ class << self
+ def await(*coprocs)
+ supervisor = Supervisor.new
+ coprocs.each { |cp| supervisor << cp }
+ supervisor.await
+ end
+ alias_method :join, :await
+
+ def select(*coprocs)
+ supervisor = Supervisor.new
+ coprocs.each { |cp| supervisor << cp }
+ supervisor.select
end
end
end