lib/async/task.rb in async-1.20.1 vs lib/async/task.rb in async-1.21.0

- old
+ new

@@ -25,17 +25,30 @@ require_relative 'condition' module Async # Raised when a task is explicitly stopped. class Stop < Exception + class Later + def initialize(task) + @task = task + end + + def alive? + true + end + + def resume + @task.stop + end + end end # A task represents the state associated with the execution of an asynchronous # block. class Task < Node extend Forwardable - + # Yield the unerlying `result` for the task. If the result # is an Exception, then that result will be raised an its # exception. # @return [Object] result of the task # @raise [Exception] if the result is an exception @@ -83,11 +96,11 @@ attr :reactor def_delegators :@reactor, :with_timeout, :timeout, :sleep # Yield back to the reactor and allow other fibers to execute. def yield - reactor.yield + Task.yield{reactor.yield} end # @attr fiber [Fiber] The fiber which is being used for the execution of this task. attr :fiber @@ -135,33 +148,38 @@ # Soon to become attr :result # Stop the task and all of its children. # @return [void] def stop - if self.stopping? - # If we are already stopping this task... don't try to stop it again. - return true - elsif self.running? - @status = :stopping - + if self.stopped? + # If we already stopped this task... don't try to stop it again: + return + end + + if self.running? if self.current? raise Stop, "Stopping current fiber!" elsif @fiber&.alive? - @fiber.resume(Stop.new) + begin + @fiber.resume(Stop.new) + rescue FiberError + @reactor << Stop::Later.new(self) + end end + else + # We are not running, but children might be, so transition directly into stopped state: + stop! end - ensure - @children&.each(&:stop) end - + # Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available. # @return [Async::Task] # @raise [RuntimeError] if task was not {set!} for the current fiber. def self.current Thread.current[:async_task] or raise RuntimeError, "No async task available!" end - + # Check if there is a task defined for the current fiber. # @return [Async::Task, nil] def self.current? Thread.current[:async_task] end @@ -173,11 +191,11 @@ # Check if the task is running. # @return [Boolean] def running? @status == :running end - + # Whether we can remove this node from the reactor graph. # @return [Boolean] def finished? super && @status != :running end @@ -215,29 +233,31 @@ logger.debug(self) {$!} end end def stop! + # logger.debug(self) {"Task was stopped with #{@children.size} children!"} @status = :stopped + @children&.each(&:stop) end def make_fiber(&block) Fiber.new do |*args| set! begin @result = yield(self, *args) @status = :complete - # logger.debug("Task #{self} completed normally.") + # logger.debug(self) {"Task was completed with #{@children.size} children!"} rescue Stop stop! rescue StandardError => error fail!(error, false) rescue Exception => exception fail!(exception, true) ensure - # logger.debug("Task #{self} closing: #{$!}") + # logger.debug(self) {"Task ensure $!=#{$!} with #{@children.size} children!"} finish! end end end @@ -252,10 +272,10 @@ # If this task was being used as a future, signal completion here: if @finished @finished.signal(@result) end end - + # Set the current fiber's `:async_task` to this task. def set! # This is actually fiber-local: Thread.current[:async_task] = self end