lib/async/task.rb in async-1.12.0 vs lib/async/task.rb in async-1.13.0
- old
+ new
@@ -26,11 +26,11 @@
module Async
# Raised when a task is explicitly stopped.
class Stop < Exception
end
-
+
# A task represents the state associated with the execution of an asynchronous
# block.
class Task < Node
extend Forwardable
@@ -55,45 +55,25 @@
end
# Create a new task.
# @param reactor [Async::Reactor] the reactor this task will run within.
# @param parent [Async::Task] the parent task.
- def initialize(reactor, parent = Task.current?)
+ # @param propagate_exceptions [Boolean] whether exceptions raised in the task will propagate up the reactor stack.
+ def initialize(reactor, parent = Task.current?, &block)
super(parent || reactor)
@reactor = reactor
@status = :initialized
@result = nil
-
@finished = nil
- @fiber = Fiber.new do |*args|
- set!
-
- begin
- @result = yield(self, *args)
- @status = :complete
- # Async.logger.debug("Task #{self} completed normally.")
- rescue Stop
- @status = :stop
- # Async.logger.debug("Task #{self} stopped: #{$!}")
- Async.logger.debug(self) {$!}
- rescue Exception => error
- @result = error
- @status = :failed
- Async.logger.debug(self) {$!}
- raise
- ensure
- # Async.logger.debug("Task #{self} closing: #{$!}")
- finish!
- end
- end
+ @fiber = make_fiber(&block)
end
def to_s
- "<#{self.description} status=#{@status}>"
+ "<#{self.description} #{@status}>"
end
# @attr ios [Reactor] The reactor the task was created within.
attr :reactor
def_delegators :@reactor, :timeout, :sleep
@@ -105,11 +85,11 @@
# @attr fiber [Fiber] The fiber which is being used for the execution of this task.
attr :fiber
def_delegators :@fiber, :alive?
- # @attr status [Symbol] The status of the execution of the fiber, one of `:running`, `:complete`, `:stopped`, or `:failed`.
+ # @attr status [Symbol] The status of the execution of the fiber, one of `:initialized`, `:running`, `:complete`, `:stopped` or `:failed`.
attr :status
# Resume the execution of the task.
def run(*args)
if @status == :initialized
@@ -128,24 +108,25 @@
return task
end
# Retrieve the current result of the task. Will cause the caller to wait until result is available.
# @raise [RuntimeError] if the task's fiber is the current fiber.
- # @return [Object]
- def result
- raise RuntimeError.new("Cannot wait on own fiber") if Fiber.current.equal?(@fiber)
+ # @return [Object] the final expression/result of the task's block.
+ def wait
+ raise RuntimeError, "Cannot wait on own fiber" if Fiber.current.equal?(@fiber)
if running?
@finished ||= Condition.new
@finished.wait
else
- Task.yield {@result}
+ Task.yield{@result}
end
end
- alias wait result
-
+ # Deprecated.
+ alias result wait
+
# Stop the task and all of its children.
# @return [void]
def stop
@children.each(&:stop)
@@ -176,12 +157,61 @@
# Whether we can remove this node from the reactor graph.
# @return [Boolean]
def finished?
super && @status != :running
end
-
+
+ def failed?
+ @status == :failed
+ end
+
+ def stopped?
+ @status == :stopped
+ end
+
private
-
+
+ # This is a very tricky aspect of tasks to get right. I've modelled it after `Thread` but it's slightly different in that the exception can propagate back up through the reactor. If the user writes code which raises an exception, that exception should always be visible, i.e. cause a failure. If it's not visible, such code fails silently and can be very difficult to debug.
+ # As an explcit choice, the user can start a task which doesn't propagate exceptions. This only applies to `StandardError` and derived tasks. This allows tasks to internally capture their error state which is raised when invoking `Task#result` similar to how `Thread#join` works. This mode makes `Async::Task` behave more like a promise, and you would need to ensure that someone calls `Task#result` otherwise you might miss important errors.
+ def fail!(exception = nil, propagate = true)
+ @status = :failed
+ @result = exception
+
+ if propagate
+ raise
+ elsif @finished.nil?
+ # If no one has called wait, we log this as an error:
+ Async.logger.error(self) {$!}
+ else
+ Async.logger.debug(self) {$!}
+ end
+ end
+
+ def stop!
+ @status = :stopped
+ end
+
+ def make_fiber(&block)
+ Fiber.new do |*args|
+ set!
+
+ begin
+ @result = yield(self, *args)
+ @status = :complete
+ # Async.logger.debug("Task #{self} completed normally.")
+ rescue Stop
+ stop!
+ rescue StandardError => error
+ fail!(error, false)
+ rescue Exception => exception
+ fail!(exception, true)
+ ensure
+ # Async.logger.debug("Task #{self} closing: #{$!}")
+ finish!
+ end
+ end
+ end
+
# Finish the current task, and all bound bound IO objects.
def finish!
# Attempt to remove this node from the task tree.
consume