lib/async/container/group.rb in async-container-0.14.1 vs lib/async/container/group.rb in async-container-0.15.0
- old
+ new
@@ -16,64 +16,66 @@
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
-require 'async/reactor'
-
-require_relative 'controller'
-require_relative 'statistics'
-
module Async
- # Manages a reactor within one or more threads.
module Container
class Group
def initialize
@pgid = nil
@running = {}
+
+ @queue = nil
end
def spawn(*arguments)
+ self.yield
+
if pid = ::Process.spawn(*arguments)
wait_for(pid)
end
end
def fork(&block)
+ self.yield
+
if pid = ::Process.fork(&block)
wait_for(pid)
end
end
def any?
@running.any?
end
+ def sleep(duration)
+ self.resume
+ self.suspend
+
+ Kernel::sleep(duration)
+
+ while self.wait_one(::Process::WNOHANG)
+ end
+ end
+
def wait
+ self.resume
+
while self.any?
self.wait_one
end
- rescue Interrupt
- # If the user interrupts the wait, interrupt the process group and wait for them to finish:
- self.kill(:INT)
-
- # If user presses Ctrl-C again (or something else goes wrong), we will come out and kill(:TERM) in the ensure below:
- wait_all
-
- raise
- ensure
- self.close
end
def kill(signal = :INT)
::Process.kill(signal, -@pgid) if @pgid
end
def stop(graceful = false)
if graceful
self.kill(:INT)
- wait_all
+ interrupt_all
end
ensure
self.close
end
@@ -83,18 +85,37 @@
rescue Errno::EPERM
# Sometimes, `kill` code can give EPERM, if any signal couldn't be delivered to a child. This might occur if an exception is thrown in the user code (e.g. within the fiber), and there are other zombie processes which haven't been reaped yet. These should be dealt with below, so it shouldn't be an issue to ignore this condition.
end
# Clean up zombie processes - if user presses Ctrl-C or for some reason something else blows up, exception would propagate back to caller:
- wait_all
+ interrupt_all
end
protected
- def wait_all
+ def yield
+ if @queue
+ @queue << Fiber.current
+ Fiber.yield
+ end
+ end
+
+ def suspend
+ @queue ||= []
+ end
+
+ def resume
+ if @queue
+ @queue.each(&:resume)
+ @queue = nil
+ end
+ end
+
+ def interrupt_all
while self.any?
self.wait_one do |fiber, status|
begin
+ # This causes the waiting fiber to `raise Interrupt`:
fiber.resume(nil)
rescue Interrupt
# Graceful exit.
end
end