lib/gofer/cluster.rb in gofer-0.5.0 vs lib/gofer/cluster.rb in gofer-0.6.0

- old
+ new

@@ -86,28 +86,40 @@ # Spawn +concurrency+ worker threads, each of which pops work off the # +_in+ queue, and writes values to the +_out+ queue for syncronisation. def threaded(meth, *args) _in = run_queue - length = run_queue.length + length = _in.length _out = Queue.new results = {} - (0...concurrency).map do + errors = {} + results_semaphore = Mutex.new + errors_semaphore = Mutex.new + concurrency.times do Thread.new do loop do host = _in.pop(false) rescue Thread.exit - results[host] = host.send(meth, *args) + begin + result = host.send(meth, *args) + results_semaphore.synchronize { results[host] = result } + rescue Exception => e + errors_semaphore.synchronize { errors[host] = e } + end _out << true end end end length.times do _out.pop end - results + if errors.size > 0 + raise Gofer::ClusterError.new(errors) + else + results + end end def run_queue Queue.new.tap do |q| @hosts.each do |h|