lib/async/scheduler.rb in async-2.14.2 vs lib/async/scheduler.rb in async-2.15.0

- old
+ new

@@ -69,11 +69,11 @@ return @busy_time else return @busy_time / total_time end end - + # Invoked when the fiber scheduler is being closed. # # Executes the run loop until all tasks are finished, then closes the scheduler. def scheduler_close # If the execution context (thread) was handling an exception, we want to exit as quickly as possible: @@ -82,38 +82,34 @@ end ensure self.close end - # Terminate the scheduler. We deliberately ignore interrupts here, as this code can be called from an interrupt, and we don't want to be interrupted while cleaning up. - def terminate - Thread.handle_interrupt(::Interrupt => :never) do - super + private def shutdown! + # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. + self.stop + + self.run_loop do + unless @children.nil? + run_once! + end end end # Terminate all child tasks and close the scheduler. # @public Since `stable-v1`. def close - # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. - until self.terminate - self.run_once! - end + self.shutdown! Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0 - - # We depend on GVL for consistency: - # @guard.synchronize do - + ensure # We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it. selector = @selector @selector = nil selector&.close - # end - consume end # @returns [Boolean] Whether the scheduler has been closed. # @public Since `stable-v1`. @@ -295,11 +291,11 @@ # Run one iteration of the event loop. # Does not handle interrupts. # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. # @returns [Boolean] Whether there is more work to do. def run_once(timeout = nil) - Kernel::raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? + Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? # If we are finished, we stop the task tree and exit: if self.finished? return false end @@ -311,11 +307,11 @@ # # When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks. # # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. # @returns [Boolean] Whether there is more work to do. - private def run_once!(timeout = 0) + private def run_once!(timeout = nil) start_time = Async::Clock.now interval = @timers.wait_interval # If there is no interval to wait (thus no timers), and no tasks, we could be done: @@ -363,42 +359,53 @@ end return false end - # Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided. - def run(...) - Kernel::raise ClosedError if @selector.nil? - - initial_task = self.async(...) if block_given? + # Stop all children, including transient children, ignoring any signals. + def stop + Thread.handle_interrupt(::SignalException => :never) do + @children&.each do |child| + child.stop + end + end + end + + private def run_loop(&block) interrupt = nil begin # In theory, we could use Exception here to be a little bit safer, but we've only shown the case for SignalException to be a problem, so let's not over-engineer this. Thread.handle_interrupt(::SignalException => :never) do - while true - # If we are interrupted, we need to exit: - break if self.interrupted? - + until self.interrupted? # If we are finished, we need to exit: - break unless self.run_once + break unless yield end end rescue Interrupt => interrupt - Thread.handle_interrupt(::SignalException => :never) do - self.stop - end + self.stop retry end # If the event loop was interrupted, and we finished exiting normally (due to the interrupt), we need to re-raise the interrupt so that the caller can handle it too. - Kernel.raise interrupt if interrupt + Kernel.raise(interrupt) if interrupt + end + + # Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided. + def run(...) + Kernel.raise ClosedError if @selector.nil? + initial_task = self.async(...) if block_given? + + self.run_loop do + unless self.finished? + run_once! + end + end + return initial_task - ensure - Console.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."} end # Start an asynchronous task within the specified reactor. The task will be # executed until the first blocking call, at which point it will yield and # and this method will return. @@ -407,10 +414,10 @@ # # @yields {|task| ...} Executed within the task. # @returns [Task] The task that was scheduled into the reactor. # @deprecated With no replacement. def async(*arguments, **options, &block) - Kernel::raise ClosedError if @selector.nil? + Kernel.raise ClosedError if @selector.nil? task = Task.new(Task.current? || self, **options, &block) # I want to take a moment to explain the logic of this. # When calling an async block, we deterministically execute it until the