lib/async/container/group.rb in async-container-0.16.6 vs lib/async/container/group.rb in async-container-0.16.7

- old
+ new

@@ -19,71 +19,84 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. require 'fiber' - require 'async/clock' require_relative 'error' module Async module Container # Manages a group of running processes. class Group + # Initialize an empty group. def initialize @running = {} # This queue allows us to wait for processes to complete, without spawning new processes as a result. @queue = nil end # @attribute [Hash<IO, Fiber>] the running tasks, indexed by IO. attr :running + # Whether the group contains any running processes. + # @returns [Boolean] def running? @running.any? end + # Whether the group contains any running processes. + # @returns [Boolean] def any? @running.any? end + # Whether the group is empty. + # @returns [Boolean] def empty? @running.empty? end - # This method sleeps for at most the specified duration. + # Sleep for at most the specified duration until some state change occurs. def sleep(duration) self.resume self.suspend self.wait_for_children(duration) end + # Begin any outstanding queued processes and wait for them indefinitely. def wait self.resume while self.running? self.wait_for_children end end + # Interrupt all running processes. + # This resumes the controlling fiber with an instance of {Interrupt}. def interrupt Async.logger.debug(self, "Sending interrupt to #{@running.size} running processes...") @running.each_value do |fiber| fiber.resume(Interrupt) end end + # Terminate all running processes. + # This resumes the controlling fiber with an instance of {Terminate}. def terminate Async.logger.debug(self, "Sending terminate to #{@running.size} running processes...") @running.each_value do |fiber| fiber.resume(Terminate) end end + # Stop all child processes using {#terminate}. + # @parameter timeout [Boolean | Numeric | Nil] If specified, invoke a graceful shutdown using {#interrupt} first. def stop(timeout = 1) # Use a default timeout if not specified: timeout = 1 if timeout == true if timeout @@ -109,10 +122,11 @@ # Wait for all children to exit: self.wait end + # Wait for a message in the specified {Channel}. def wait_for(channel) io = channel.in @running[io] = Fiber.current @@ -135,9 +149,10 @@ protected def wait_for_children(duration = nil) if !@running.empty? + # Maybe consider using a proper event loop here: readable, _, _ = ::IO.select(@running.keys, nil, nil, duration) readable&.each do |io| @running[io].resume end