lib/async/scheduler.rb in async-2.15.0 vs lib/async/scheduler.rb in async-2.15.1
- old
+ new
@@ -73,34 +73,37 @@
end
# Invoked when the fiber scheduler is being closed.
#
# Executes the run loop until all tasks are finished, then closes the scheduler.
- def scheduler_close
+ def scheduler_close(error = $!)
# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
- unless $!
+ unless error
self.run
end
ensure
self.close
end
- private def shutdown!
- # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly.
- self.stop
-
- self.run_loop do
- unless @children.nil?
- run_once!
- end
+ # Terminate all child tasks.
+ def terminate
+ # If that doesn't work, take more serious action:
+ @children&.each do |child|
+ child.terminate
end
+
+ return @children.nil?
end
# Terminate all child tasks and close the scheduler.
# @public Since `stable-v1`.
def close
- self.shutdown!
+ self.run_loop do
+ until self.terminate
+ self.run_once!
+ end
+ end
Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0
ensure
# We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it.
selector = @selector
@@ -287,25 +290,10 @@
def process_wait(pid, flags)
return @selector.process_wait(Fiber.current, pid, flags)
end
# Run one iteration of the event loop.
- # Does not handle interrupts.
- # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
- # @returns [Boolean] Whether there is more work to do.
- def run_once(timeout = nil)
- Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?
-
- # If we are finished, we stop the task tree and exit:
- if self.finished?
- return false
- end
-
- return run_once!(timeout)
- end
-
- # Run one iteration of the event loop.
#
# When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks.
#
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
@@ -344,10 +332,29 @@
# The reactor still has work to do:
return true
end
+ # Run one iteration of the event loop.
+ # Does not handle interrupts.
+ # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
+ # @returns [Boolean] Whether there is more work to do.
+ def run_once(timeout = nil)
+ Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?
+
+ if self.finished?
+ self.stop
+ end
+
+ # If we are finished, we stop the task tree and exit:
+ if @children.nil?
+ return false
+ end
+
+ return run_once!(timeout)
+ end
+
# Checks and clears the interrupted state of the scheduler.
# @returns [Boolean] Whether the reactor has been interrupted.
private def interrupted?
if @interrupted
@interrupted = false
@@ -361,14 +368,12 @@
return false
end
# Stop all children, including transient children, ignoring any signals.
def stop
- Thread.handle_interrupt(::SignalException => :never) do
- @children&.each do |child|
- child.stop
- end
+ @children&.each do |child|
+ child.stop
end
end
private def run_loop(&block)
interrupt = nil
@@ -380,11 +385,13 @@
# If we are finished, we need to exit:
break unless yield
end
end
rescue Interrupt => interrupt
- self.stop
+ Thread.handle_interrupt(::SignalException => :never) do
+ self.stop
+ end
retry
end
# If the event loop was interrupted, and we finished exiting normally (due to the interrupt), we need to re-raise the interrupt so that the caller can handle it too.
@@ -396,12 +403,10 @@
Kernel.raise ClosedError if @selector.nil?
initial_task = self.async(...) if block_given?
self.run_loop do
- unless self.finished?
- run_once!
- end
+ run_once
end
return initial_task
end