lib/gofer/cluster.rb in gofer-0.3.1 vs lib/gofer/cluster.rb in gofer-0.4.0
- old
+ new
@@ -1,73 +1,70 @@
require 'thread'
module Gofer
class Cluster
+
attr_reader :hosts
attr_accessor :max_concurrency
- def initialize(parties=[])
+
+ def initialize(parties=[], opts={})
@hosts = []
- @max_concurrency = nil
+ @max_concurrency = opts.delete(:max_concurrency)
- parties.each do |i|
- self << i
- end
+ parties.each { |i| self << i }
end
+ def concurrency
+ max_concurrency.nil? ? hosts.length : [max_concurrency, hosts.length].min
+ end
+
def <<(other)
case other
when Cluster
- other.hosts.each do |host|
- @hosts << host
- end
+ other.hosts.each { |host| self << host }
when Host
@hosts << other
end
end
- def run(opts={}, &block)
- concurrency = opts[:max_concurrency] || max_concurrency || hosts.length
- block.call(ClusterCommandRunner.new(hosts, concurrency))
+ %w{run exist? read directory? ls upload read write}.each do |host_method|
+ define_method host_method do |*args|
+ threaded(host_method, *args)
+ end
end
- class ClusterCommandRunner
- def initialize(hosts, concurrency)
- @concurrency = concurrency
- @hosts = hosts
- end
+ private
- # Spawn +concurrency+ worker threads, each of which pops work off the
- # +_in+ queue, and writes values to the +_out+ queue for syncronisation.
- def run(cmd, opts={})
- _in = run_queue
- length = run_queue.length
- _out = Queue.new
- results = {}
- (0...@concurrency).map do
- Thread.new do
- loop do
- host = _in.pop(false) rescue Thread.exit
+ # Spawn +concurrency+ worker threads, each of which pops work off the
+ # +_in+ queue, and writes values to the +_out+ queue for syncronisation.
+ def threaded(meth, *args)
+ _in = run_queue
+ length = run_queue.length
+ _out = Queue.new
+ results = {}
+ (0...concurrency).map do
+ Thread.new do
+ loop do
+ host = _in.pop(false) rescue Thread.exit
- results[host] = host.run(cmd, opts)
- _out << true
- end
+ results[host] = host.send(meth, *args)
+ _out << true
end
end
+ end
- length.times do
- _out.pop
- end
-
- return results
+ length.times do
+ _out.pop
end
- def run_queue
- Queue.new.tap do |q|
- @hosts.each do |h|
- q << h
- end
+ results
+ end
+
+ def run_queue
+ Queue.new.tap do |q|
+ @hosts.each do |h|
+ q << h
end
end
end
-
end
end