lib/async/scheduler.rb in async-2.15.0 vs lib/async/scheduler.rb in async-2.15.1

- old
+ new

@@ -73,34 +73,37 @@ end # Invoked when the fiber scheduler is being closed. # # Executes the run loop until all tasks are finished, then closes the scheduler. - def scheduler_close + def scheduler_close(error = $!) # If the execution context (thread) was handling an exception, we want to exit as quickly as possible: - unless $! + unless error self.run end ensure self.close end - private def shutdown! - # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. - self.stop - - self.run_loop do - unless @children.nil? - run_once! - end + # Terminate all child tasks. + def terminate + # If that doesn't work, take more serious action: + @children&.each do |child| + child.terminate end + + return @children.nil? end # Terminate all child tasks and close the scheduler. # @public Since `stable-v1`. def close - self.shutdown! + self.run_loop do + until self.terminate + self.run_once! + end + end Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0 ensure # We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it. selector = @selector @@ -287,25 +290,10 @@ def process_wait(pid, flags) return @selector.process_wait(Fiber.current, pid, flags) end # Run one iteration of the event loop. - # Does not handle interrupts. - # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. - # @returns [Boolean] Whether there is more work to do. - def run_once(timeout = nil) - Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? - - # If we are finished, we stop the task tree and exit: - if self.finished? - return false - end - - return run_once!(timeout) - end - - # Run one iteration of the event loop. # # When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks. # # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. # @returns [Boolean] Whether there is more work to do. @@ -344,10 +332,29 @@ # The reactor still has work to do: return true end + # Run one iteration of the event loop. + # Does not handle interrupts. + # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. + # @returns [Boolean] Whether there is more work to do. + def run_once(timeout = nil) + Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? + + if self.finished? + self.stop + end + + # If we are finished, we stop the task tree and exit: + if @children.nil? + return false + end + + return run_once!(timeout) + end + # Checks and clears the interrupted state of the scheduler. # @returns [Boolean] Whether the reactor has been interrupted. private def interrupted? if @interrupted @interrupted = false @@ -361,14 +368,12 @@ return false end # Stop all children, including transient children, ignoring any signals. def stop - Thread.handle_interrupt(::SignalException => :never) do - @children&.each do |child| - child.stop - end + @children&.each do |child| + child.stop end end private def run_loop(&block) interrupt = nil @@ -380,11 +385,13 @@ # If we are finished, we need to exit: break unless yield end end rescue Interrupt => interrupt - self.stop + Thread.handle_interrupt(::SignalException => :never) do + self.stop + end retry end # If the event loop was interrupted, and we finished exiting normally (due to the interrupt), we need to re-raise the interrupt so that the caller can handle it too. @@ -396,12 +403,10 @@ Kernel.raise ClosedError if @selector.nil? initial_task = self.async(...) if block_given? self.run_loop do - unless self.finished? - run_once! - end + run_once end return initial_task end