lib/polyphony/extensions/thread.rb in polyphony-1.5 vs lib/polyphony/extensions/thread.rb in polyphony-1.6

- old
+ new

@@ -12,11 +12,10 @@ # Initializes the thread. # @param args [Array] arguments to pass to thread block def initialize(*args, &block) @join_wait_queue = [] - @finalization_mutex = Mutex.new @args = args @block = block orig_initialize { execute } end @@ -39,20 +38,11 @@ # terminated without propagating the timeout exception. # # @param timeout [Number] timeout interval # @return [any] thread's return value def join(timeout = nil) - watcher = Fiber.current.auto_watcher - - @finalization_mutex.synchronize do - if @terminated - @result.is_a?(Exception) ? (raise @result) : (return @result) - else - @join_wait_queue << watcher - end - end - timeout ? move_on_after(timeout) { watcher.await } : watcher.await + timeout ? move_on_after(timeout) { await_done } : await_done end alias_method :await, :join # @!visibility private alias_method :orig_raise, :raise @@ -60,31 +50,28 @@ # Raises an exception in the context of the thread. If no exception is given, # a `RuntimeError` is raised. # # @param error [Exception, Class, nil] exception spec def raise(error = nil) - Thread.pass until @main_fiber - error = RuntimeError.new if error.nil? - error = RuntimeError.new(error) if error.is_a?(String) - error = error.new if error.is_a?(Class) - sleep 0.0001 until @ready - main_fiber&.raise(error) + + error = Exception.instantiate(error) + if Thread.current == self + Kernel.raise(error) + else + @main_fiber&.raise(error) + end end # @!visibility private alias_method :orig_kill, :kill # Terminates the thread. # # @return [Thread] self - def kill - return self if @terminated + alias_method :kill, :kill_safe - raise Polyphony::Terminate - end - # @!visibility private alias_method :orig_inspect, :inspect # Returns a string representation of the thread for debugging purposes. # @@ -126,10 +113,15 @@ # @return [Proc] idle handler def on_idle(&block) backend.idle_proc = block end + def value + join + @result.is_a?(Exception) ? raise(@result) : @result + end + private # Runs the thread's block, handling any uncaught exceptions. # # @return [any] thread result value @@ -157,16 +149,16 @@ # Finalizes the thread. # # @param result [any] thread's return value def finalize(result) + # We need to make sure the fiber is not on the runqueue. This, in order to + # prevent a race condition between #finalize and #kill. + fiber_unschedule(Fiber.current) Fiber.current.shutdown_all_children if !Fiber.current.children.empty? - @finalization_mutex.synchronize do - @terminated = true - @result = result - signal_waiters(result) - end + @result = result + mark_as_done(result) @backend&.finalize end # Signals all fibers waiting for the thread to terminate. #