lib/polyphony/core/supervisor.rb in polyphony-0.24 vs lib/polyphony/core/supervisor.rb in polyphony-0.25
- old
+ new
@@ -1,39 +1,39 @@
# frozen_string_literal: true
export_default :Supervisor
-Coprocess = import('./coprocess')
-Exceptions = import('./exceptions')
+import '../extensions/fiber'
+Exceptions = import './exceptions'
-# Implements a supervision mechanism for controlling multiple coprocesses
+# Implements a supervision mechanism for controlling multiple fibers
class Supervisor
def initialize
- @coprocesses = []
+ @fibers = []
@pending = {}
end
def await(&block)
@mode = :await
@supervisor_fiber = Fiber.current
block&.(self)
suspend
- @coprocesses.map(&:result)
+ @fibers.map(&:result)
rescue Exceptions::MoveOn => e
e.value
ensure
finalize_await
end
alias_method :join, :await
def select(&block)
@mode = :select
- @select_coproc = nil
+ @select_fiber = nil
@supervisor_fiber = Fiber.current
block&.(self)
suspend
- [@select_coproc.result, @select_coproc]
+ [@select_fiber.result, @select_fiber]
rescue Exceptions::MoveOn => e
e.value
ensure
finalize_select
end
@@ -50,25 +50,19 @@
def finalize_select
stop_all_tasks if still_running?
@supervisor_fiber = nil
end
- def spin(coproc = nil, &block)
- coproc = Coprocess.new(&(coproc || block)) unless coproc.is_a?(Coprocess)
- @coprocesses << coproc
- @pending[coproc] = true
- coproc.when_done { task_completed(coproc) }
- coproc.run unless coproc.alive?
- coproc
+ def spin(orig_caller = caller, &block)
+ add Fiber.spin(orig_caller, &block)
end
- def add(coproc)
- @coprocesses << coproc
- @pending[coproc] = true
- coproc.when_done { task_completed(coproc) }
- coproc.run unless coproc.alive?
- coproc
+ def add(fiber)
+ @fibers << fiber
+ @pending[fiber] = true
+ fiber.when_done { task_completed(fiber) }
+ fiber
end
alias_method :<<, :add
def still_running?
!@pending.empty?
@@ -87,33 +81,33 @@
@pending.each_key do |c|
c.schedule(exception)
end
end
- def task_completed(coprocess)
- return unless @pending[coprocess]
+ def task_completed(fiber)
+ return unless @pending[fiber]
- @pending.delete(coprocess)
- return unless @pending.empty? || (@mode == :select && !@select_coproc)
+ @pending.delete(fiber)
+ return unless @pending.empty? || (@mode == :select && !@select_fiber)
- @select_coproc = coprocess if @mode == :select
+ @select_fiber = fiber if @mode == :select
@supervisor_fiber&.schedule
end
end
-# Extension for Coprocess class
-class Coprocess
+# Supervision extensions for Fiber class
+class ::Fiber
class << self
- def await(*coprocs)
+ def await(*fibers)
supervisor = Supervisor.new
- coprocs.each { |cp| supervisor << cp }
+ fibers.each { |f| supervisor << f }
supervisor.await
end
alias_method :join, :await
- def select(*coprocs)
+ def select(*fibers)
supervisor = Supervisor.new
- coprocs.each { |cp| supervisor << cp }
+ fibers.each { |f| supervisor << f }
supervisor.select
end
end
end