lib/async/task.rb in async-0.13.0 vs lib/async/task.rb in async-0.14.0
- old
+ new
@@ -51,38 +51,29 @@
raise result
else
return result
end
end
-
+
# Create a new task.
- # @param ios [Array] an array of `IO` objects such as `TCPServer`, `Socket`, etc.
- # @param reactor [Async::Reactor]
- # @return [void]
- def initialize(ios, reactor)
- if parent = Task.current?
- super(parent)
- else
- super(reactor)
- end
+ # @param reactor [Async::Reactor] the reactor this task will run within.
+ # @param parent [Async::Task] the parent task.
+ def initialize(reactor, parent = Task.current?)
+ super(parent || reactor)
- @ios = Hash[
- ios.collect{|io| [io.fileno, reactor.wrap(io, self)]}
- ]
-
@reactor = reactor
- @status = :running
+ @status = :initialized
@result = nil
@condition = nil
- @fiber = Fiber.new do
+ @fiber = Fiber.new do |args|
set!
begin
- @result = yield(*@ios.values, self)
+ @result = yield(self, *args)
@status = :complete
# Async.logger.debug("Task #{self} completed normally.")
rescue Interrupt
@status = :interrupted
# Async.logger.debug("Task #{self} interrupted: #{$!}")
@@ -91,23 +82,18 @@
@status = :failed
# Async.logger.debug("Task #{self} failed: #{$!}")
raise
ensure
# Async.logger.debug("Task #{self} closing: #{$!}")
- close
+ finish!
end
end
end
- # Show the current status of the task as a string.
- # @todo (picat) Add test for this method?
def to_s
- "#{super}[#{@status}]"
+ "<#{self.description} status=#{@status}>"
end
-
- # @attr ios [Array<IO>] All wrappers associated with this task.
- attr :ios
# @attr ios [Reactor] The reactor the task was created within.
attr :reactor
def_delegators :@reactor, :timeout, :sleep
@@ -117,14 +103,27 @@
# @attr status [Symbol] The status of the execution of the fiber, one of `:running`, `:complete`, `:interrupted`, or `:failed`.
attr :status
# Resume the execution of the task.
- def run
- @fiber.resume
+ def run(*args)
+ if @status == :initialized
+ @status = :running
+ @fiber.resume(*args)
+ else
+ raise RuntimeError, "Task already running!"
+ end
end
-
+
+ def async(*args, &block)
+ task = Task.new(@reactor, self, &block)
+
+ task.run(*args)
+
+ return task
+ end
+
# Retrieve the current result of the task. Will cause the caller to wait until result is available.
# @raise [RuntimeError] if the task's fiber is the current fiber.
# @return [Object]
def result
raise RuntimeError.new("Cannot wait on own fiber") if Fiber.current.equal?(@fiber)
@@ -148,76 +147,47 @@
exception = Interrupt.new("Stop right now!")
@fiber.resume(exception)
end
end
- # Provide a wrapper to an IO object with a Reactor.
- # @yield [Async::Wrapper] a wrapped object.
- def with(io, *args)
- wrapper = @reactor.wrap(io, self)
- yield wrapper, *args
- ensure
- wrapper.close if wrapper
- io.close if io
- end
-
- # Wrap and bind the given object to the reactor.
- # @param io the native object to bind to this task.
- # @return [Wrapper] The wrapped object.
- def bind(io)
- @ios[io.fileno] ||= @reactor.wrap(io, self)
- end
-
- # Register a given IO with given interests to be able to monitor it.
- # @param io [IO] a native io object.
- # @param interests [Symbol] One of `:r`, `:w` or `:rw`.
- # @return [NIO::Monitor]
- def register(io, interests)
- @reactor.register(io, interests)
- end
-
# Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available.
# @return [Async::Task]
# @raise [RuntimeError] if task was not {set!} for the current fiber.
def self.current
Thread.current[:async_task] or raise RuntimeError, "No async task available!"
end
-
-
+
# Check if there is a task defined for the current fiber.
# @return [Async::Task, nil]
def self.current?
Thread.current[:async_task]
end
-
+
# Check if the task is running.
# @return [Boolean]
def running?
@status == :running
end
-
+
# Whether we can remove this node from the reactor graph.
# @return [Boolean]
def finished?
super && @status != :running
end
-
- # Close all bound IO objects.
- def close
- @ios.each_value(&:close)
- @ios = nil
-
+
+ private
+
+ # Finish the current task, and all bound bound IO objects.
+ def finish!
# Attempt to remove this node from the task tree.
consume
# If this task was being used as a future, signal completion here:
if @condition
@condition.signal(@result)
end
end
-
- private
-
+
# Set the current fiber's `:async_task` to this task.
def set!
# This is actually fiber-local:
Thread.current[:async_task] = self
end