lib/rake/comp_tree/algorithm.rb in drake-0.8.4.1.0.15 vs lib/rake/comp_tree/algorithm.rb in drake-0.8.4.1.0.16

- old
+ new

@@ -1,147 +1,162 @@ require 'rake/comp_tree/diagnostic' +module Rake end module Rake::CompTree module Algorithm include Diagnostic - def compute_multithreaded(root, num_threads, use_fork, buckets) + module_function + + def loop_with(leave, again) + catch(leave) { + while true + catch(again) { + yield + } + end + } + end + + def compute_multithreaded(root, num_threads) trace "Computing #{root.name} with #{num_threads} threads" + result = nil - mutex = Mutex.new + + tree_mutex = Mutex.new node_finished_condition = ConditionVariable.new thread_wake_condition = ConditionVariable.new - threads = [] - - # workaround: jruby gives "run" status for waiting on - # condition variable - num_threads_ready = 0 + num_threads_in_use = 0 - num_threads.times { |thread_index| - threads << Thread.new { + threads = (0...num_threads).map { |thread_index| + Thread.new { # # wait for main thread # - mutex.synchronize { + tree_mutex.synchronize { trace "Thread #{thread_index} waiting to start" - num_threads_ready += 1 - thread_wake_condition.wait(mutex) + num_threads_in_use += 1 + thread_wake_condition.wait(tree_mutex) } - while true - trace "Thread #{thread_index} node search" - - # - # Done! Thread will exit. - # - break if mutex.synchronize { - result + loop_with(:leave, :again) { + node = tree_mutex.synchronize { + trace "Thread #{thread_index} aquired tree lock; begin node search" + if result + trace "Thread #{thread_index} detected finish" + num_threads_in_use -= 1 + throw :leave + else + # + # Find a node. The node we obtain, if any, will be locked. + # + if node = find_node(root) + trace "Thread #{thread_index} found node #{node.name}" + node + else + trace "Thread #{thread_index}: no node found; sleeping." + thread_wake_condition.wait(tree_mutex) + throw :again + end + end } - # - # Lock the tree and find a node. The node we - # obtain, if any, is already locked. - # - node = mutex.synchronize { - find_node(root) - } + trace "Thread #{thread_index} computing node" + node_result = compute_node(node) + trace "Thread #{thread_index} node computed; waiting for tree lock" - if node - trace "Thread #{thread_index} found node #{node.name}" - - node_result = - compute_node( - node, - use_fork, - buckets ? buckets[thread_index] : nil) - - mutex.synchronize { - node.result = node_result + tree_mutex.synchronize { + trace "Thread #{thread_index} acquired tree lock" + debug { + name = "#{node.name}" + ((node == root) ? " (ROOT NODE)" : "") + initial = "Thread #{thread_index} compute result for #{name}: " + status = node_result.is_a?(Exception) ? "error" : "success" + trace initial + status + trace "Thread #{thread_index} node result: #{node_result}" } + node.result = node_result + # # remove locks for this node (shared lock and own lock) # - mutex.synchronize { - node.unlock - if node == root - # - # Root node was computed; we are done. - # - trace "Thread #{thread_index} got final answer" - result = root.result - end - node_finished_condition.signal - } - else - trace "Thread #{thread_index}: no node found; sleeping." - mutex.synchronize { - thread_wake_condition.wait(mutex) - } - end - end + node.unlock + + if node == root or node_result.is_a? Exception + # + # Root node was computed or error occurred; we are done. + # + result = node_result + end + + # + # Tell the main thread that another node was computed. + # + node_finished_condition.signal + } + } trace "Thread #{thread_index} exiting" } } trace "Main: waiting for threads to launch and block." - while true - break if mutex.synchronize { - num_threads_ready == num_threads - } + until tree_mutex.synchronize { num_threads_in_use == num_threads } Thread.pass end - - trace "Main: entering main loop" - mutex.synchronize { - while true + + tree_mutex.synchronize { + trace "Main: entering main loop" + until num_threads_in_use == 0 trace "Main: waking threads" thread_wake_condition.broadcast if result trace "Main: detected finish." break end trace "Main: waiting for a node" - node_finished_condition.wait(mutex) + node_finished_condition.wait(tree_mutex) trace "Main: got a node" end } trace "Main: waiting for threads to finish." - catch(:done) { - while true - mutex.synchronize { - throw :done if threads.all? { |thread| - thread.status == false - } - thread_wake_condition.broadcast - } - Thread.pass - end + loop_with(:leave, :again) { + tree_mutex.synchronize { + if threads.all? { |thread| thread.status == false } + throw :leave + end + thread_wake_condition.broadcast + } + Thread.pass } trace "Main: computation done." - result + if result.is_a? Exception + raise result + else + result + end end def find_node(node) - # --- only called inside mutex + # --- only called inside shared tree mutex trace "Looking for a node, starting with #{node.name}" if node.result # # already computed # trace "#{node.name} has been computed" nil - elsif node.children_results and node.try_lock + elsif (children_results = node.find_children_results) and node.try_lock # # Node is not computed and its children are computed; # and we have the lock. Ready to compute. # + node.children_results = children_results node else # # locked or children not computed; recurse to children # @@ -153,57 +168,15 @@ } nil end end - def compute_node(node, use_fork, bucket) - if use_fork - trace "About to fork for node #{node.name}" - if bucket - # - # Use our assigned bucket to transfer the result. - # - fork_node(node) { - node.trace_compute - bucket.contents = node.compute - } - bucket.contents - else - # - # No bucket -- discarding result - # - fork_node(node) { - node.trace_compute - node.compute - } - true - end - else - # - # No fork - # + def compute_node(node) + begin node.trace_compute node.compute + rescue Exception => e + e end end - - def fork_node(node) - trace "About to fork for node #{node.name}" - process_id = RetriableFork.fork { - trace "Fork: process #{Process.pid}" - node.trace_compute - yield - trace "Fork: computation done" - } - trace "Waiting for process #{process_id}" - Process.wait(process_id) - trace "Process #{process_id} finished" - exitstatus = $?.exitstatus - if exitstatus != 0 - trace "Process #{process_id} returned #{exitstatus}; exiting." - exit(1) - end - end - - extend self end end