lib/async/task.rb in async-0.12.0 vs lib/async/task.rb in async-0.13.0

- old
+ new

@@ -23,16 +23,25 @@ require_relative 'node' require_relative 'condition' module Async + # Raised when a task is explicitly stopped. class Interrupt < Exception end - + + # A task represents the state associated with the execution of an asynchronous + # block. class Task < Node extend Forwardable - + + # Yield the unerlying `result` for the task. If the result + # is an Exception, then that result will be raised an its + # exception. + # @return [Object] result of the task + # @raise [Exception] if the result is an exception + # @yield [result] result of the task if a block if given. def self.yield if block_given? result = yield else result = Fiber.yield @@ -42,11 +51,15 @@ raise result else return result end end - + + # Create a new task. + # @param ios [Array] an array of `IO` objects such as `TCPServer`, `Socket`, etc. + # @param reactor [Async::Reactor] + # @return [void] def initialize(ios, reactor) if parent = Task.current? super(parent) else super(reactor) @@ -83,29 +96,38 @@ close end end end + # Show the current status of the task as a string. + # @todo (picat) Add test for this method? def to_s "#{super}[#{@status}]" end - + + # @attr ios [Array<IO>] All wrappers associated with this task. attr :ios + # @attr ios [Reactor] The reactor the task was created within. attr :reactor def_delegators :@reactor, :timeout, :sleep + # @attr fiber [Fiber] The fiber which is being used for the execution of this task. attr :fiber def_delegators :@fiber, :alive? + # @attr status [Symbol] The status of the execution of the fiber, one of `:running`, `:complete`, `:interrupted`, or `:failed`. attr :status - attr :result + # Resume the execution of the task. def run @fiber.resume end - + + # Retrieve the current result of the task. Will cause the caller to wait until result is available. + # @raise [RuntimeError] if the task's fiber is the current fiber. + # @return [Object] def result raise RuntimeError.new("Cannot wait on own fiber") if Fiber.current.equal?(@fiber) if running? @condition ||= Condition.new @@ -114,66 +136,89 @@ Task.yield {@result} end end alias wait result - + + # Stop the task and all of its children. + # @return [void] def stop @children.each(&:stop) if @fiber.alive? exception = Interrupt.new("Stop right now!") @fiber.resume(exception) end end - - def with(io) + + # Provide a wrapper to an IO object with a Reactor. + # @yield [Async::Wrapper] a wrapped object. + def with(io, *args) wrapper = @reactor.wrap(io, self) - - yield wrapper + yield wrapper, *args ensure - wrapper.close - io.close + wrapper.close if wrapper + io.close if io end + # Wrap and bind the given object to the reactor. + # @param io the native object to bind to this task. + # @return [Wrapper] The wrapped object. def bind(io) - @ios[io.fileno] ||= reactor.wrap(io, self) + @ios[io.fileno] ||= @reactor.wrap(io, self) end + # Register a given IO with given interests to be able to monitor it. + # @param io [IO] a native io object. + # @param interests [Symbol] One of `:r`, `:w` or `:rw`. + # @return [NIO::Monitor] def register(io, interests) @reactor.register(io, interests) end - + + # Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available. + # @return [Async::Task] + # @raise [RuntimeError] if task was not {set!} for the current fiber. def self.current Thread.current[:async_task] or raise RuntimeError, "No async task available!" end - + + + # Check if there is a task defined for the current fiber. + # @return [Async::Task, nil] def self.current? Thread.current[:async_task] end - + + # Check if the task is running. + # @return [Boolean] def running? @status == :running end # Whether we can remove this node from the reactor graph. + # @return [Boolean] def finished? super && @status != :running end + # Close all bound IO objects. def close @ios.each_value(&:close) - @ios = [] + @ios = nil + # Attempt to remove this node from the task tree. consume + # If this task was being used as a future, signal completion here: if @condition @condition.signal(@result) end end private + # Set the current fiber's `:async_task` to this task. def set! # This is actually fiber-local: Thread.current[:async_task] = self end end