lib/async/task.rb in async-0.10.0 vs lib/async/task.rb in async-0.11.0
- old
+ new
@@ -20,18 +20,33 @@
require 'fiber'
require 'forwardable'
require_relative 'node'
+require_relative 'condition'
module Async
class Interrupt < Exception
end
class Task < Node
extend Forwardable
+ def self.yield
+ if block_given?
+ result = yield
+ else
+ result = Fiber.yield
+ end
+
+ if result.is_a? Exception
+ raise result
+ else
+ return result
+ end
+ end
+
def initialize(ios, reactor)
if parent = Task.current?
super(parent)
else
super(reactor)
@@ -44,21 +59,24 @@
@reactor = reactor
@status = :running
@result = nil
+ @condition = nil
+
@fiber = Fiber.new do
set!
begin
@result = yield(*@ios.values, self)
@status = :complete
# Async.logger.debug("Task #{self} completed normally.")
rescue Interrupt
@status = :interrupted
# Async.logger.debug("Task #{self} interrupted: #{$!}")
- rescue Exception
+ rescue Exception => error
+ @result = error
@status = :failed
# Async.logger.debug("Task #{self} failed: #{$!}")
raise
ensure
# Async.logger.debug("Task #{self} closing: #{$!}")
@@ -82,14 +100,25 @@
attr :status
attr :result
def run
@fiber.resume
-
- return @fiber
end
+ def result
+ raise RuntimeError.new("Cannot wait on own fiber") if Fiber.current.equal?(@fiber)
+
+ if running?
+ @condition ||= Condition.new
+ @condition.wait
+ else
+ Task.yield {@result}
+ end
+ end
+
+ alias wait result
+
def stop
@children.each(&:stop)
if @fiber.alive?
exception = Interrupt.new("Stop right now!")
@@ -120,19 +149,27 @@
def self.current?
Thread.current[:async_task]
end
+ def running?
+ @status == :running
+ end
+
# Whether we can remove this node from the reactor graph.
def finished?
super && @status != :running
end
def close
@ios.each_value(&:close)
@ios = []
consume
+
+ if @condition
+ @condition.signal(@result)
+ end
end
private
def set!