lib/async/task.rb in async-0.10.0 vs lib/async/task.rb in async-0.11.0

- old
+ new

@@ -20,18 +20,33 @@ require 'fiber' require 'forwardable' require_relative 'node' +require_relative 'condition' module Async class Interrupt < Exception end class Task < Node extend Forwardable + def self.yield + if block_given? + result = yield + else + result = Fiber.yield + end + + if result.is_a? Exception + raise result + else + return result + end + end + def initialize(ios, reactor) if parent = Task.current? super(parent) else super(reactor) @@ -44,21 +59,24 @@ @reactor = reactor @status = :running @result = nil + @condition = nil + @fiber = Fiber.new do set! begin @result = yield(*@ios.values, self) @status = :complete # Async.logger.debug("Task #{self} completed normally.") rescue Interrupt @status = :interrupted # Async.logger.debug("Task #{self} interrupted: #{$!}") - rescue Exception + rescue Exception => error + @result = error @status = :failed # Async.logger.debug("Task #{self} failed: #{$!}") raise ensure # Async.logger.debug("Task #{self} closing: #{$!}") @@ -82,14 +100,25 @@ attr :status attr :result def run @fiber.resume - - return @fiber end + def result + raise RuntimeError.new("Cannot wait on own fiber") if Fiber.current.equal?(@fiber) + + if running? + @condition ||= Condition.new + @condition.wait + else + Task.yield {@result} + end + end + + alias wait result + def stop @children.each(&:stop) if @fiber.alive? exception = Interrupt.new("Stop right now!") @@ -120,19 +149,27 @@ def self.current? Thread.current[:async_task] end + def running? + @status == :running + end + # Whether we can remove this node from the reactor graph. def finished? super && @status != :running end def close @ios.each_value(&:close) @ios = [] consume + + if @condition + @condition.signal(@result) + end end private def set!