lib/async/task.rb in async-0.9.1 vs lib/async/task.rb in async-0.10.0

- old
+ new

@@ -19,56 +19,86 @@ # THE SOFTWARE. require 'fiber' require 'forwardable' +require_relative 'node' + module Async class Interrupt < Exception end - class Task + class Task < Node extend Forwardable - def initialize(ios, reactor, &block) + def initialize(ios, reactor) + if parent = Task.current? + super(parent) + else + super(reactor) + end + @ios = Hash[ ios.collect{|io| [io.fileno, reactor.wrap(io, self)]} ] @reactor = reactor + @status = :running + @result = nil + @fiber = Fiber.new do set! begin - yield(*@ios.values, self) + @result = yield(*@ios.values, self) + @status = :complete # Async.logger.debug("Task #{self} completed normally.") rescue Interrupt + @status = :interrupted # Async.logger.debug("Task #{self} interrupted: #{$!}") + rescue Exception + @status = :failed + # Async.logger.debug("Task #{self} failed: #{$!}") + raise ensure + # Async.logger.debug("Task #{self} closing: #{$!}") close end end end + def to_s + "#{super}[#{@status}]" + end + + attr :ios + + attr :reactor def_delegators :@reactor, :timeout, :sleep + attr :fiber + def_delegators :@fiber, :alive? + + attr :status + attr :result + def run @fiber.resume return @fiber end - def stop! + def stop + @children.each(&:stop) + if @fiber.alive? exception = Interrupt.new("Stop right now!") @fiber.resume(exception) end end - attr :ios - attr :reactor - def with(io) wrapper = @reactor.wrap(io, self) yield wrapper ensure @@ -86,14 +116,26 @@ def self.current Thread.current[:async_task] or raise RuntimeError, "No async task available!" end - private + def self.current? + Thread.current[:async_task] + end + # Whether we can remove this node from the reactor graph. + def finished? + super && @status != :running + end + def close @ios.each_value(&:close) + @ios = [] + + consume end + + private def set! # This is actually fiber-local: Thread.current[:async_task] = self end