lib/async/container/group.rb in async-container-0.16.6 vs lib/async/container/group.rb in async-container-0.16.7
- old
+ new
@@ -19,71 +19,84 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
require 'fiber'
-
require 'async/clock'
require_relative 'error'
module Async
module Container
# Manages a group of running processes.
class Group
+ # Initialize an empty group.
def initialize
@running = {}
# This queue allows us to wait for processes to complete, without spawning new processes as a result.
@queue = nil
end
# @attribute [Hash<IO, Fiber>] the running tasks, indexed by IO.
attr :running
+ # Whether the group contains any running processes.
+ # @returns [Boolean]
def running?
@running.any?
end
+ # Whether the group contains any running processes.
+ # @returns [Boolean]
def any?
@running.any?
end
+ # Whether the group is empty.
+ # @returns [Boolean]
def empty?
@running.empty?
end
- # This method sleeps for at most the specified duration.
+ # Sleep for at most the specified duration until some state change occurs.
def sleep(duration)
self.resume
self.suspend
self.wait_for_children(duration)
end
+ # Begin any outstanding queued processes and wait for them indefinitely.
def wait
self.resume
while self.running?
self.wait_for_children
end
end
+ # Interrupt all running processes.
+ # This resumes the controlling fiber with an instance of {Interrupt}.
def interrupt
Async.logger.debug(self, "Sending interrupt to #{@running.size} running processes...")
@running.each_value do |fiber|
fiber.resume(Interrupt)
end
end
+ # Terminate all running processes.
+ # This resumes the controlling fiber with an instance of {Terminate}.
def terminate
Async.logger.debug(self, "Sending terminate to #{@running.size} running processes...")
@running.each_value do |fiber|
fiber.resume(Terminate)
end
end
+ # Stop all child processes using {#terminate}.
+ # @parameter timeout [Boolean | Numeric | Nil] If specified, invoke a graceful shutdown using {#interrupt} first.
def stop(timeout = 1)
# Use a default timeout if not specified:
timeout = 1 if timeout == true
if timeout
@@ -109,10 +122,11 @@
# Wait for all children to exit:
self.wait
end
+ # Wait for a message in the specified {Channel}.
def wait_for(channel)
io = channel.in
@running[io] = Fiber.current
@@ -135,9 +149,10 @@
protected
def wait_for_children(duration = nil)
if !@running.empty?
+ # Maybe consider using a proper event loop here:
readable, _, _ = ::IO.select(@running.keys, nil, nil, duration)
readable&.each do |io|
@running[io].resume
end