lib/polyphony/core/supervisor.rb in polyphony-0.19 vs lib/polyphony/core/supervisor.rb in polyphony-0.20
- old
+ new
@@ -3,10 +3,11 @@
export_default :Supervisor
Coprocess = import('./coprocess')
Exceptions = import('./exceptions')
+# Implements a supervision mechanism for controlling multiple coprocesses
class Supervisor
def initialize
@coprocesses = []
end
@@ -15,10 +16,14 @@
block&.(self)
suspend
rescue Exceptions::MoveOn => e
e.value
ensure
+ finalize_await
+ end
+
+ def finalize_await
if still_running?
stop_all_tasks
suspend
else
@supervisor_fiber = nil
@@ -27,34 +32,34 @@
def spin(proc = nil, &block)
proc = Coprocess.new(&(proc || block)) unless proc.is_a?(Coprocess)
@coprocesses << proc
proc.when_done { task_completed(proc) }
- proc.run unless proc.running?
+ proc.run unless proc.alive?
proc
end
def still_running?
!@coprocesses.empty?
end
def stop!(result = nil)
return unless @supervisor_fiber && !@stopped
-
+
@stopped = true
@supervisor_fiber.transfer Exceptions::MoveOn.new(nil, result)
end
def stop_all_tasks
- exception = Exceptions::Stop.new
+ exception = Exceptions::MoveOn.new
@coprocesses.each do |c|
- EV.next_tick { c.interrupt(exception) }
+ c.transfer(exception)
end
end
def task_completed(coprocess)
return unless @coprocesses.include?(coprocess)
-
+
@coprocesses.delete(coprocess)
@supervisor_fiber&.transfer if @coprocesses.empty?
end
end