lib/async/scheduler.rb in async-2.12.1 vs lib/async/scheduler.rb in async-2.13.0
- old
+ new
@@ -14,11 +14,15 @@
require 'resolv'
module Async
# Handles scheduling of fibers. Implements the fiber scheduler interface.
class Scheduler < Node
+ # Raised when an operation is attempted on a closed scheduler.
class ClosedError < RuntimeError
+ # Create a new error.
+ #
+ # @parameter message [String] The error message.
def initialize(message = "Scheduler is closed!")
super
end
end
@@ -26,10 +30,15 @@
# @public Since `stable-v1`.
def self.supported?
true
end
+ # Create a new scheduler.
+ #
+ # @public Since `stable-v1`.
+ # @parameter parent [Node | Nil] The parent node to use for task hierarchy.
+ # @parameter selector [IO::Event::Selector] The selector to use for event handling.
def initialize(parent = nil, selector: nil)
super(parent)
@selector = selector || ::IO::Event::Selector.new(Fiber.current)
@interrupted = false
@@ -61,10 +70,13 @@
else
return @busy_time / total_time
end
end
+ # Invoked when the fiber scheduler is being closed.
+ #
+ # Executes the run loop until all tasks are finished, then closes the scheduler.
def scheduler_close
# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
unless $!
self.run
end
@@ -77,10 +89,11 @@
Thread.handle_interrupt(::Interrupt => :never) do
super
end
end
+ # Terminate all child tasks and close the scheduler.
# @public Since `stable-v1`.
def close
# It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly.
until self.terminate
self.run_once!
@@ -106,10 +119,11 @@
# @public Since `stable-v1`.
def closed?
@selector.nil?
end
+ # @returns [String] A description of the scheduler.
def to_s
"\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>"
end
# Interrupt the event loop and cause it to exit.
@@ -133,13 +147,23 @@
# @parameter fiber [Fiber | Object] The object to be resumed on the next iteration of the run-loop.
def push(fiber)
@selector.push(fiber)
end
- def raise(*arguments)
- @selector.raise(*arguments)
+ # Raise an exception on a specified fiber with the given arguments.
+ #
+ # This internally schedules the current fiber to be ready, before raising the exception, so that it will later resume execution.
+ #
+ # @parameter fiber [Fiber] The fiber to raise the exception on.
+ # @parameter *arguments [Array] The arguments to pass to the fiber.
+ def raise(...)
+ @selector.raise(...)
end
+ # Resume execution of the specified fiber.
+ #
+ # @parameter fiber [Fiber] The fiber to resume.
+ # @parameter arguments [Array] The arguments to pass to the fiber.
def resume(fiber, *arguments)
@selector.resume(fiber, *arguments)
end
# Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue.