lib/async/container/group.rb in async-container-0.14.1 vs lib/async/container/group.rb in async-container-0.15.0

- old
+ new

@@ -16,64 +16,66 @@ # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -require 'async/reactor' - -require_relative 'controller' -require_relative 'statistics' - module Async - # Manages a reactor within one or more threads. module Container class Group def initialize @pgid = nil @running = {} + + @queue = nil end def spawn(*arguments) + self.yield + if pid = ::Process.spawn(*arguments) wait_for(pid) end end def fork(&block) + self.yield + if pid = ::Process.fork(&block) wait_for(pid) end end def any? @running.any? end + def sleep(duration) + self.resume + self.suspend + + Kernel::sleep(duration) + + while self.wait_one(::Process::WNOHANG) + end + end + def wait + self.resume + while self.any? self.wait_one end - rescue Interrupt - # If the user interrupts the wait, interrupt the process group and wait for them to finish: - self.kill(:INT) - - # If user presses Ctrl-C again (or something else goes wrong), we will come out and kill(:TERM) in the ensure below: - wait_all - - raise - ensure - self.close end def kill(signal = :INT) ::Process.kill(signal, -@pgid) if @pgid end def stop(graceful = false) if graceful self.kill(:INT) - wait_all + interrupt_all end ensure self.close end @@ -83,18 +85,37 @@ rescue Errno::EPERM # Sometimes, `kill` code can give EPERM, if any signal couldn't be delivered to a child. This might occur if an exception is thrown in the user code (e.g. within the fiber), and there are other zombie processes which haven't been reaped yet. These should be dealt with below, so it shouldn't be an issue to ignore this condition. end # Clean up zombie processes - if user presses Ctrl-C or for some reason something else blows up, exception would propagate back to caller: - wait_all + interrupt_all end protected - def wait_all + def yield + if @queue + @queue << Fiber.current + Fiber.yield + end + end + + def suspend + @queue ||= [] + end + + def resume + if @queue + @queue.each(&:resume) + @queue = nil + end + end + + def interrupt_all while self.any? self.wait_one do |fiber, status| begin + # This causes the waiting fiber to `raise Interrupt`: fiber.resume(nil) rescue Interrupt # Graceful exit. end end