lib/async/task.rb in async-1.20.1 vs lib/async/task.rb in async-1.21.0
- old
+ new
@@ -25,17 +25,30 @@
require_relative 'condition'
module Async
# Raised when a task is explicitly stopped.
class Stop < Exception
+ class Later
+ def initialize(task)
+ @task = task
+ end
+
+ def alive?
+ true
+ end
+
+ def resume
+ @task.stop
+ end
+ end
end
# A task represents the state associated with the execution of an asynchronous
# block.
class Task < Node
extend Forwardable
-
+
# Yield the unerlying `result` for the task. If the result
# is an Exception, then that result will be raised an its
# exception.
# @return [Object] result of the task
# @raise [Exception] if the result is an exception
@@ -83,11 +96,11 @@
attr :reactor
def_delegators :@reactor, :with_timeout, :timeout, :sleep
# Yield back to the reactor and allow other fibers to execute.
def yield
- reactor.yield
+ Task.yield{reactor.yield}
end
# @attr fiber [Fiber] The fiber which is being used for the execution of this task.
attr :fiber
@@ -135,33 +148,38 @@
# Soon to become attr :result
# Stop the task and all of its children.
# @return [void]
def stop
- if self.stopping?
- # If we are already stopping this task... don't try to stop it again.
- return true
- elsif self.running?
- @status = :stopping
-
+ if self.stopped?
+ # If we already stopped this task... don't try to stop it again:
+ return
+ end
+
+ if self.running?
if self.current?
raise Stop, "Stopping current fiber!"
elsif @fiber&.alive?
- @fiber.resume(Stop.new)
+ begin
+ @fiber.resume(Stop.new)
+ rescue FiberError
+ @reactor << Stop::Later.new(self)
+ end
end
+ else
+ # We are not running, but children might be, so transition directly into stopped state:
+ stop!
end
- ensure
- @children&.each(&:stop)
end
-
+
# Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available.
# @return [Async::Task]
# @raise [RuntimeError] if task was not {set!} for the current fiber.
def self.current
Thread.current[:async_task] or raise RuntimeError, "No async task available!"
end
-
+
# Check if there is a task defined for the current fiber.
# @return [Async::Task, nil]
def self.current?
Thread.current[:async_task]
end
@@ -173,11 +191,11 @@
# Check if the task is running.
# @return [Boolean]
def running?
@status == :running
end
-
+
# Whether we can remove this node from the reactor graph.
# @return [Boolean]
def finished?
super && @status != :running
end
@@ -215,29 +233,31 @@
logger.debug(self) {$!}
end
end
def stop!
+ # logger.debug(self) {"Task was stopped with #{@children.size} children!"}
@status = :stopped
+ @children&.each(&:stop)
end
def make_fiber(&block)
Fiber.new do |*args|
set!
begin
@result = yield(self, *args)
@status = :complete
- # logger.debug("Task #{self} completed normally.")
+ # logger.debug(self) {"Task was completed with #{@children.size} children!"}
rescue Stop
stop!
rescue StandardError => error
fail!(error, false)
rescue Exception => exception
fail!(exception, true)
ensure
- # logger.debug("Task #{self} closing: #{$!}")
+ # logger.debug(self) {"Task ensure $!=#{$!} with #{@children.size} children!"}
finish!
end
end
end
@@ -252,10 +272,10 @@
# If this task was being used as a future, signal completion here:
if @finished
@finished.signal(@result)
end
end
-
+
# Set the current fiber's `:async_task` to this task.
def set!
# This is actually fiber-local:
Thread.current[:async_task] = self
end