lib/polyphony/extensions/thread.rb in polyphony-1.5 vs lib/polyphony/extensions/thread.rb in polyphony-1.6
- old
+ new
@@ -12,11 +12,10 @@
# Initializes the thread.
# @param args [Array] arguments to pass to thread block
def initialize(*args, &block)
@join_wait_queue = []
- @finalization_mutex = Mutex.new
@args = args
@block = block
orig_initialize { execute }
end
@@ -39,20 +38,11 @@
# terminated without propagating the timeout exception.
#
# @param timeout [Number] timeout interval
# @return [any] thread's return value
def join(timeout = nil)
- watcher = Fiber.current.auto_watcher
-
- @finalization_mutex.synchronize do
- if @terminated
- @result.is_a?(Exception) ? (raise @result) : (return @result)
- else
- @join_wait_queue << watcher
- end
- end
- timeout ? move_on_after(timeout) { watcher.await } : watcher.await
+ timeout ? move_on_after(timeout) { await_done } : await_done
end
alias_method :await, :join
# @!visibility private
alias_method :orig_raise, :raise
@@ -60,31 +50,28 @@
# Raises an exception in the context of the thread. If no exception is given,
# a `RuntimeError` is raised.
#
# @param error [Exception, Class, nil] exception spec
def raise(error = nil)
- Thread.pass until @main_fiber
- error = RuntimeError.new if error.nil?
- error = RuntimeError.new(error) if error.is_a?(String)
- error = error.new if error.is_a?(Class)
-
sleep 0.0001 until @ready
- main_fiber&.raise(error)
+
+ error = Exception.instantiate(error)
+ if Thread.current == self
+ Kernel.raise(error)
+ else
+ @main_fiber&.raise(error)
+ end
end
# @!visibility private
alias_method :orig_kill, :kill
# Terminates the thread.
#
# @return [Thread] self
- def kill
- return self if @terminated
+ alias_method :kill, :kill_safe
- raise Polyphony::Terminate
- end
-
# @!visibility private
alias_method :orig_inspect, :inspect
# Returns a string representation of the thread for debugging purposes.
#
@@ -126,10 +113,15 @@
# @return [Proc] idle handler
def on_idle(&block)
backend.idle_proc = block
end
+ def value
+ join
+ @result.is_a?(Exception) ? raise(@result) : @result
+ end
+
private
# Runs the thread's block, handling any uncaught exceptions.
#
# @return [any] thread result value
@@ -157,16 +149,16 @@
# Finalizes the thread.
#
# @param result [any] thread's return value
def finalize(result)
+ # We need to make sure the fiber is not on the runqueue. This, in order to
+ # prevent a race condition between #finalize and #kill.
+ fiber_unschedule(Fiber.current)
Fiber.current.shutdown_all_children if !Fiber.current.children.empty?
- @finalization_mutex.synchronize do
- @terminated = true
- @result = result
- signal_waiters(result)
- end
+ @result = result
+ mark_as_done(result)
@backend&.finalize
end
# Signals all fibers waiting for the thread to terminate.
#