lib/parallel.rb in grosser-parallel-0.3.0 vs lib/parallel.rb in grosser-parallel-0.3.1

- old
+ new

@@ -1,7 +1,9 @@ class Parallel - def self.in_threads(count=2) + VERSION = File.read( File.join(File.dirname(__FILE__),'..','VERSION') ).strip + + def self.in_threads(count = 2) out = [] threads = [] count.times do |i| threads[i] = Thread.new do @@ -11,61 +13,70 @@ threads.each{|t| t.join } out end - def self.in_processes(count=nil) - count ||= processor_count - - #start writing results into n pipes + def self.in_processes(count = processor_count) + # Start writing results into n pipes reads = [] writes = [] pids = [] count.times do |i| reads[i], writes[i] = IO.pipe - pids << Process.fork{ Marshal.dump(yield(i), writes[i]) } #write serialized result + pids << Process.fork do + Marshal.dump(yield(i), writes[i]) # Serialize result + end end kill_on_ctrl_c(pids) - #collect results from pipes simultanously - #otherwise pipes get stuck when to much is written (buffer full) + # Collect results from pipes simultanously + # otherwise pipes get stuck when to much is written (buffer full) out = [] collectors = [] count.times do |i| collectors << Thread.new do writes[i].close - out[i]="" + out[i] = '' while text = reads[i].gets out[i] += text end reads[i].close end end - collectors.each{|c|c.join} + collectors.each{|c| c.join } - out.map{|x| Marshal.load(x)} #deserialize + out.map{|x| Marshal.load(x) } # Deserialize results end - def self.map(array, options={}) - count = if options[:in_threads] - method = 'in_threads' - options[:in_threads] + def self.map(array, options = {}) + require 'thread' # to get Thread.exclusive + + if options[:in_threads] + method = :in_threads + size = options[method] else - method = 'in_processes' - options[:in_processes] || processor_count + method = :in_processes + size = options[method] || processor_count end + # work in #{size} threads that use threads/processes results = [] - in_groups_of(array, count).each do |group| - results += send(method, group.size) do |i| - yield group[i] + current = -1 + + in_threads(size) do + # as long as there are more items, work on one of them + loop do + index = Thread.exclusive{ current+=1 } + break if index >= array.size + results[index] = *send(method, 1){ yield array[index] } end end + results end def self.processor_count case RUBY_PLATFORM @@ -76,14 +87,14 @@ end end private - def self.in_groups_of(array, count) + def self.in_groups_of(array, size) results = [] loop do - slice = array[(results.size * count)...((results.size+1) * count)] + slice = array[(results.size * size)...((results.size+1) * size)] if slice.nil? or slice.empty? break else results << slice end @@ -91,12 +102,12 @@ results end #handle user interrup (Ctrl+c) def self.kill_on_ctrl_c(pids) - Signal.trap 'SIGINT' do - STDERR.puts "Parallel execution interrupted, exiting ..." - pids.each { |pid| Process.kill("KILL", pid) } - exit 1 + Signal.trap :SIGINT do + $stderr.puts 'Parallel execution interrupted, exiting ...' + pids.each { |pid| Process.kill(:KILL, pid) } + exit 1 # Quit with 'failed' signal end end end \ No newline at end of file