lib/async/task.rb in async-0.13.0 vs lib/async/task.rb in async-0.14.0

- old
+ new

@@ -51,38 +51,29 @@ raise result else return result end end - + # Create a new task. - # @param ios [Array] an array of `IO` objects such as `TCPServer`, `Socket`, etc. - # @param reactor [Async::Reactor] - # @return [void] - def initialize(ios, reactor) - if parent = Task.current? - super(parent) - else - super(reactor) - end + # @param reactor [Async::Reactor] the reactor this task will run within. + # @param parent [Async::Task] the parent task. + def initialize(reactor, parent = Task.current?) + super(parent || reactor) - @ios = Hash[ - ios.collect{|io| [io.fileno, reactor.wrap(io, self)]} - ] - @reactor = reactor - @status = :running + @status = :initialized @result = nil @condition = nil - @fiber = Fiber.new do + @fiber = Fiber.new do |args| set! begin - @result = yield(*@ios.values, self) + @result = yield(self, *args) @status = :complete # Async.logger.debug("Task #{self} completed normally.") rescue Interrupt @status = :interrupted # Async.logger.debug("Task #{self} interrupted: #{$!}") @@ -91,23 +82,18 @@ @status = :failed # Async.logger.debug("Task #{self} failed: #{$!}") raise ensure # Async.logger.debug("Task #{self} closing: #{$!}") - close + finish! end end end - # Show the current status of the task as a string. - # @todo (picat) Add test for this method? def to_s - "#{super}[#{@status}]" + "<#{self.description} status=#{@status}>" end - - # @attr ios [Array<IO>] All wrappers associated with this task. - attr :ios # @attr ios [Reactor] The reactor the task was created within. attr :reactor def_delegators :@reactor, :timeout, :sleep @@ -117,14 +103,27 @@ # @attr status [Symbol] The status of the execution of the fiber, one of `:running`, `:complete`, `:interrupted`, or `:failed`. attr :status # Resume the execution of the task. - def run - @fiber.resume + def run(*args) + if @status == :initialized + @status = :running + @fiber.resume(*args) + else + raise RuntimeError, "Task already running!" + end end - + + def async(*args, &block) + task = Task.new(@reactor, self, &block) + + task.run(*args) + + return task + end + # Retrieve the current result of the task. Will cause the caller to wait until result is available. # @raise [RuntimeError] if the task's fiber is the current fiber. # @return [Object] def result raise RuntimeError.new("Cannot wait on own fiber") if Fiber.current.equal?(@fiber) @@ -148,76 +147,47 @@ exception = Interrupt.new("Stop right now!") @fiber.resume(exception) end end - # Provide a wrapper to an IO object with a Reactor. - # @yield [Async::Wrapper] a wrapped object. - def with(io, *args) - wrapper = @reactor.wrap(io, self) - yield wrapper, *args - ensure - wrapper.close if wrapper - io.close if io - end - - # Wrap and bind the given object to the reactor. - # @param io the native object to bind to this task. - # @return [Wrapper] The wrapped object. - def bind(io) - @ios[io.fileno] ||= @reactor.wrap(io, self) - end - - # Register a given IO with given interests to be able to monitor it. - # @param io [IO] a native io object. - # @param interests [Symbol] One of `:r`, `:w` or `:rw`. - # @return [NIO::Monitor] - def register(io, interests) - @reactor.register(io, interests) - end - # Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available. # @return [Async::Task] # @raise [RuntimeError] if task was not {set!} for the current fiber. def self.current Thread.current[:async_task] or raise RuntimeError, "No async task available!" end - - + # Check if there is a task defined for the current fiber. # @return [Async::Task, nil] def self.current? Thread.current[:async_task] end - + # Check if the task is running. # @return [Boolean] def running? @status == :running end - + # Whether we can remove this node from the reactor graph. # @return [Boolean] def finished? super && @status != :running end - - # Close all bound IO objects. - def close - @ios.each_value(&:close) - @ios = nil - + + private + + # Finish the current task, and all bound bound IO objects. + def finish! # Attempt to remove this node from the task tree. consume # If this task was being used as a future, signal completion here: if @condition @condition.signal(@result) end end - - private - + # Set the current fiber's `:async_task` to this task. def set! # This is actually fiber-local: Thread.current[:async_task] = self end