lib/rake/comp_tree/algorithm.rb in drake-0.8.4.1.0.15 vs lib/rake/comp_tree/algorithm.rb in drake-0.8.4.1.0.16
- old
+ new
@@ -1,147 +1,162 @@
require 'rake/comp_tree/diagnostic'
+module Rake end
module Rake::CompTree
module Algorithm
include Diagnostic
- def compute_multithreaded(root, num_threads, use_fork, buckets)
+ module_function
+
+ def loop_with(leave, again)
+ catch(leave) {
+ while true
+ catch(again) {
+ yield
+ }
+ end
+ }
+ end
+
+ def compute_multithreaded(root, num_threads)
trace "Computing #{root.name} with #{num_threads} threads"
+
result = nil
- mutex = Mutex.new
+
+ tree_mutex = Mutex.new
node_finished_condition = ConditionVariable.new
thread_wake_condition = ConditionVariable.new
- threads = []
-
- # workaround: jruby gives "run" status for waiting on
- # condition variable
- num_threads_ready = 0
+ num_threads_in_use = 0
- num_threads.times { |thread_index|
- threads << Thread.new {
+ threads = (0...num_threads).map { |thread_index|
+ Thread.new {
#
# wait for main thread
#
- mutex.synchronize {
+ tree_mutex.synchronize {
trace "Thread #{thread_index} waiting to start"
- num_threads_ready += 1
- thread_wake_condition.wait(mutex)
+ num_threads_in_use += 1
+ thread_wake_condition.wait(tree_mutex)
}
- while true
- trace "Thread #{thread_index} node search"
-
- #
- # Done! Thread will exit.
- #
- break if mutex.synchronize {
- result
+ loop_with(:leave, :again) {
+ node = tree_mutex.synchronize {
+ trace "Thread #{thread_index} aquired tree lock; begin node search"
+ if result
+ trace "Thread #{thread_index} detected finish"
+ num_threads_in_use -= 1
+ throw :leave
+ else
+ #
+ # Find a node. The node we obtain, if any, will be locked.
+ #
+ if node = find_node(root)
+ trace "Thread #{thread_index} found node #{node.name}"
+ node
+ else
+ trace "Thread #{thread_index}: no node found; sleeping."
+ thread_wake_condition.wait(tree_mutex)
+ throw :again
+ end
+ end
}
- #
- # Lock the tree and find a node. The node we
- # obtain, if any, is already locked.
- #
- node = mutex.synchronize {
- find_node(root)
- }
+ trace "Thread #{thread_index} computing node"
+ node_result = compute_node(node)
+ trace "Thread #{thread_index} node computed; waiting for tree lock"
- if node
- trace "Thread #{thread_index} found node #{node.name}"
-
- node_result =
- compute_node(
- node,
- use_fork,
- buckets ? buckets[thread_index] : nil)
-
- mutex.synchronize {
- node.result = node_result
+ tree_mutex.synchronize {
+ trace "Thread #{thread_index} acquired tree lock"
+ debug {
+ name = "#{node.name}" + ((node == root) ? " (ROOT NODE)" : "")
+ initial = "Thread #{thread_index} compute result for #{name}: "
+ status = node_result.is_a?(Exception) ? "error" : "success"
+ trace initial + status
+ trace "Thread #{thread_index} node result: #{node_result}"
}
+ node.result = node_result
+
#
# remove locks for this node (shared lock and own lock)
#
- mutex.synchronize {
- node.unlock
- if node == root
- #
- # Root node was computed; we are done.
- #
- trace "Thread #{thread_index} got final answer"
- result = root.result
- end
- node_finished_condition.signal
- }
- else
- trace "Thread #{thread_index}: no node found; sleeping."
- mutex.synchronize {
- thread_wake_condition.wait(mutex)
- }
- end
- end
+ node.unlock
+
+ if node == root or node_result.is_a? Exception
+ #
+ # Root node was computed or error occurred; we are done.
+ #
+ result = node_result
+ end
+
+ #
+ # Tell the main thread that another node was computed.
+ #
+ node_finished_condition.signal
+ }
+ }
trace "Thread #{thread_index} exiting"
}
}
trace "Main: waiting for threads to launch and block."
- while true
- break if mutex.synchronize {
- num_threads_ready == num_threads
- }
+ until tree_mutex.synchronize { num_threads_in_use == num_threads }
Thread.pass
end
-
- trace "Main: entering main loop"
- mutex.synchronize {
- while true
+
+ tree_mutex.synchronize {
+ trace "Main: entering main loop"
+ until num_threads_in_use == 0
trace "Main: waking threads"
thread_wake_condition.broadcast
if result
trace "Main: detected finish."
break
end
trace "Main: waiting for a node"
- node_finished_condition.wait(mutex)
+ node_finished_condition.wait(tree_mutex)
trace "Main: got a node"
end
}
trace "Main: waiting for threads to finish."
- catch(:done) {
- while true
- mutex.synchronize {
- throw :done if threads.all? { |thread|
- thread.status == false
- }
- thread_wake_condition.broadcast
- }
- Thread.pass
- end
+ loop_with(:leave, :again) {
+ tree_mutex.synchronize {
+ if threads.all? { |thread| thread.status == false }
+ throw :leave
+ end
+ thread_wake_condition.broadcast
+ }
+ Thread.pass
}
trace "Main: computation done."
- result
+ if result.is_a? Exception
+ raise result
+ else
+ result
+ end
end
def find_node(node)
- # --- only called inside mutex
+ # --- only called inside shared tree mutex
trace "Looking for a node, starting with #{node.name}"
if node.result
#
# already computed
#
trace "#{node.name} has been computed"
nil
- elsif node.children_results and node.try_lock
+ elsif (children_results = node.find_children_results) and node.try_lock
#
# Node is not computed and its children are computed;
# and we have the lock. Ready to compute.
#
+ node.children_results = children_results
node
else
#
# locked or children not computed; recurse to children
#
@@ -153,57 +168,15 @@
}
nil
end
end
- def compute_node(node, use_fork, bucket)
- if use_fork
- trace "About to fork for node #{node.name}"
- if bucket
- #
- # Use our assigned bucket to transfer the result.
- #
- fork_node(node) {
- node.trace_compute
- bucket.contents = node.compute
- }
- bucket.contents
- else
- #
- # No bucket -- discarding result
- #
- fork_node(node) {
- node.trace_compute
- node.compute
- }
- true
- end
- else
- #
- # No fork
- #
+ def compute_node(node)
+ begin
node.trace_compute
node.compute
+ rescue Exception => e
+ e
end
end
-
- def fork_node(node)
- trace "About to fork for node #{node.name}"
- process_id = RetriableFork.fork {
- trace "Fork: process #{Process.pid}"
- node.trace_compute
- yield
- trace "Fork: computation done"
- }
- trace "Waiting for process #{process_id}"
- Process.wait(process_id)
- trace "Process #{process_id} finished"
- exitstatus = $?.exitstatus
- if exitstatus != 0
- trace "Process #{process_id} returned #{exitstatus}; exiting."
- exit(1)
- end
- end
-
- extend self
end
end