lib/gofer/cluster.rb in gofer-0.5.0 vs lib/gofer/cluster.rb in gofer-0.6.0
- old
+ new
@@ -86,28 +86,40 @@
# Spawn +concurrency+ worker threads, each of which pops work off the
# +_in+ queue, and writes values to the +_out+ queue for syncronisation.
def threaded(meth, *args)
_in = run_queue
- length = run_queue.length
+ length = _in.length
_out = Queue.new
results = {}
- (0...concurrency).map do
+ errors = {}
+ results_semaphore = Mutex.new
+ errors_semaphore = Mutex.new
+ concurrency.times do
Thread.new do
loop do
host = _in.pop(false) rescue Thread.exit
- results[host] = host.send(meth, *args)
+ begin
+ result = host.send(meth, *args)
+ results_semaphore.synchronize { results[host] = result }
+ rescue Exception => e
+ errors_semaphore.synchronize { errors[host] = e }
+ end
_out << true
end
end
end
length.times do
_out.pop
end
- results
+ if errors.size > 0
+ raise Gofer::ClusterError.new(errors)
+ else
+ results
+ end
end
def run_queue
Queue.new.tap do |q|
@hosts.each do |h|