lib/gofer/cluster.rb in gofer-0.3.1 vs lib/gofer/cluster.rb in gofer-0.4.0

- old
+ new

@@ -1,73 +1,70 @@ require 'thread' module Gofer class Cluster + attr_reader :hosts attr_accessor :max_concurrency - def initialize(parties=[]) + + def initialize(parties=[], opts={}) @hosts = [] - @max_concurrency = nil + @max_concurrency = opts.delete(:max_concurrency) - parties.each do |i| - self << i - end + parties.each { |i| self << i } end + def concurrency + max_concurrency.nil? ? hosts.length : [max_concurrency, hosts.length].min + end + def <<(other) case other when Cluster - other.hosts.each do |host| - @hosts << host - end + other.hosts.each { |host| self << host } when Host @hosts << other end end - def run(opts={}, &block) - concurrency = opts[:max_concurrency] || max_concurrency || hosts.length - block.call(ClusterCommandRunner.new(hosts, concurrency)) + %w{run exist? read directory? ls upload read write}.each do |host_method| + define_method host_method do |*args| + threaded(host_method, *args) + end end - class ClusterCommandRunner - def initialize(hosts, concurrency) - @concurrency = concurrency - @hosts = hosts - end + private - # Spawn +concurrency+ worker threads, each of which pops work off the - # +_in+ queue, and writes values to the +_out+ queue for syncronisation. - def run(cmd, opts={}) - _in = run_queue - length = run_queue.length - _out = Queue.new - results = {} - (0...@concurrency).map do - Thread.new do - loop do - host = _in.pop(false) rescue Thread.exit + # Spawn +concurrency+ worker threads, each of which pops work off the + # +_in+ queue, and writes values to the +_out+ queue for syncronisation. + def threaded(meth, *args) + _in = run_queue + length = run_queue.length + _out = Queue.new + results = {} + (0...concurrency).map do + Thread.new do + loop do + host = _in.pop(false) rescue Thread.exit - results[host] = host.run(cmd, opts) - _out << true - end + results[host] = host.send(meth, *args) + _out << true end end + end - length.times do - _out.pop - end - - return results + length.times do + _out.pop end - def run_queue - Queue.new.tap do |q| - @hosts.each do |h| - q << h - end + results + end + + def run_queue + Queue.new.tap do |q| + @hosts.each do |h| + q << h end end end - end end