lib/async/task.rb in async-0.9.1 vs lib/async/task.rb in async-0.10.0
- old
+ new
@@ -19,56 +19,86 @@
# THE SOFTWARE.
require 'fiber'
require 'forwardable'
+require_relative 'node'
+
module Async
class Interrupt < Exception
end
- class Task
+ class Task < Node
extend Forwardable
- def initialize(ios, reactor, &block)
+ def initialize(ios, reactor)
+ if parent = Task.current?
+ super(parent)
+ else
+ super(reactor)
+ end
+
@ios = Hash[
ios.collect{|io| [io.fileno, reactor.wrap(io, self)]}
]
@reactor = reactor
+ @status = :running
+ @result = nil
+
@fiber = Fiber.new do
set!
begin
- yield(*@ios.values, self)
+ @result = yield(*@ios.values, self)
+ @status = :complete
# Async.logger.debug("Task #{self} completed normally.")
rescue Interrupt
+ @status = :interrupted
# Async.logger.debug("Task #{self} interrupted: #{$!}")
+ rescue Exception
+ @status = :failed
+ # Async.logger.debug("Task #{self} failed: #{$!}")
+ raise
ensure
+ # Async.logger.debug("Task #{self} closing: #{$!}")
close
end
end
end
+ def to_s
+ "#{super}[#{@status}]"
+ end
+
+ attr :ios
+
+ attr :reactor
def_delegators :@reactor, :timeout, :sleep
+ attr :fiber
+ def_delegators :@fiber, :alive?
+
+ attr :status
+ attr :result
+
def run
@fiber.resume
return @fiber
end
- def stop!
+ def stop
+ @children.each(&:stop)
+
if @fiber.alive?
exception = Interrupt.new("Stop right now!")
@fiber.resume(exception)
end
end
- attr :ios
- attr :reactor
-
def with(io)
wrapper = @reactor.wrap(io, self)
yield wrapper
ensure
@@ -86,14 +116,26 @@
def self.current
Thread.current[:async_task] or raise RuntimeError, "No async task available!"
end
- private
+ def self.current?
+ Thread.current[:async_task]
+ end
+ # Whether we can remove this node from the reactor graph.
+ def finished?
+ super && @status != :running
+ end
+
def close
@ios.each_value(&:close)
+ @ios = []
+
+ consume
end
+
+ private
def set!
# This is actually fiber-local:
Thread.current[:async_task] = self
end