lib/async/scheduler.rb in async-2.14.2 vs lib/async/scheduler.rb in async-2.15.0
- old
+ new
@@ -69,11 +69,11 @@
return @busy_time
else
return @busy_time / total_time
end
end
-
+
# Invoked when the fiber scheduler is being closed.
#
# Executes the run loop until all tasks are finished, then closes the scheduler.
def scheduler_close
# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
@@ -82,38 +82,34 @@
end
ensure
self.close
end
- # Terminate the scheduler. We deliberately ignore interrupts here, as this code can be called from an interrupt, and we don't want to be interrupted while cleaning up.
- def terminate
- Thread.handle_interrupt(::Interrupt => :never) do
- super
+ private def shutdown!
+ # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly.
+ self.stop
+
+ self.run_loop do
+ unless @children.nil?
+ run_once!
+ end
end
end
# Terminate all child tasks and close the scheduler.
# @public Since `stable-v1`.
def close
- # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly.
- until self.terminate
- self.run_once!
- end
+ self.shutdown!
Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0
-
- # We depend on GVL for consistency:
- # @guard.synchronize do
-
+ ensure
# We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it.
selector = @selector
@selector = nil
selector&.close
- # end
-
consume
end
# @returns [Boolean] Whether the scheduler has been closed.
# @public Since `stable-v1`.
@@ -295,11 +291,11 @@
# Run one iteration of the event loop.
# Does not handle interrupts.
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
def run_once(timeout = nil)
- Kernel::raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?
+ Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?
# If we are finished, we stop the task tree and exit:
if self.finished?
return false
end
@@ -311,11 +307,11 @@
#
# When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks.
#
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
- private def run_once!(timeout = 0)
+ private def run_once!(timeout = nil)
start_time = Async::Clock.now
interval = @timers.wait_interval
# If there is no interval to wait (thus no timers), and no tasks, we could be done:
@@ -363,42 +359,53 @@
end
return false
end
- # Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided.
- def run(...)
- Kernel::raise ClosedError if @selector.nil?
-
- initial_task = self.async(...) if block_given?
+ # Stop all children, including transient children, ignoring any signals.
+ def stop
+ Thread.handle_interrupt(::SignalException => :never) do
+ @children&.each do |child|
+ child.stop
+ end
+ end
+ end
+
+ private def run_loop(&block)
interrupt = nil
begin
# In theory, we could use Exception here to be a little bit safer, but we've only shown the case for SignalException to be a problem, so let's not over-engineer this.
Thread.handle_interrupt(::SignalException => :never) do
- while true
- # If we are interrupted, we need to exit:
- break if self.interrupted?
-
+ until self.interrupted?
# If we are finished, we need to exit:
- break unless self.run_once
+ break unless yield
end
end
rescue Interrupt => interrupt
- Thread.handle_interrupt(::SignalException => :never) do
- self.stop
- end
+ self.stop
retry
end
# If the event loop was interrupted, and we finished exiting normally (due to the interrupt), we need to re-raise the interrupt so that the caller can handle it too.
- Kernel.raise interrupt if interrupt
+ Kernel.raise(interrupt) if interrupt
+ end
+
+ # Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided.
+ def run(...)
+ Kernel.raise ClosedError if @selector.nil?
+ initial_task = self.async(...) if block_given?
+
+ self.run_loop do
+ unless self.finished?
+ run_once!
+ end
+ end
+
return initial_task
- ensure
- Console.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."}
end
# Start an asynchronous task within the specified reactor. The task will be
# executed until the first blocking call, at which point it will yield and
# and this method will return.
@@ -407,10 +414,10 @@
#
# @yields {|task| ...} Executed within the task.
# @returns [Task] The task that was scheduled into the reactor.
# @deprecated With no replacement.
def async(*arguments, **options, &block)
- Kernel::raise ClosedError if @selector.nil?
+ Kernel.raise ClosedError if @selector.nil?
task = Task.new(Task.current? || self, **options, &block)
# I want to take a moment to explain the logic of this.
# When calling an async block, we deterministically execute it until the