require 'thread' # to get Thread.exclusive require 'rbconfig' require 'parallel/version' module Parallel class DeadWorker < Exception end class Break < Exception end class ExceptionWrapper attr_reader :exception def initialize(exception) dumpable = Marshal.dump(exception) rescue nil unless dumpable exception = RuntimeError.new("Undumpable Exception -- #{exception.inspect}") end @exception = exception end end class Worker attr_reader :pid, :read, :write def initialize(read, write, pid) @read, @write, @pid = read, write, pid end def close_pipes read.close write.close end def wait Process.wait(pid) rescue Interrupt # process died end def work(index) begin Marshal.dump(index, write) rescue Errno::EPIPE raise DeadWorker end begin Marshal.load(read) rescue EOFError raise DeadWorker end end end class << self def in_threads(options={:count => 2}) count, options = extract_count_from_options(options) out = [] threads = [] count.times do |i| threads[i] = Thread.new do out[i] = yield(i) end end kill_on_ctrl_c(threads) { wait_for_threads(threads) } out end def in_processes(options = {}, &block) count, options = extract_count_from_options(options) count ||= processor_count map(0...count, options.merge(:in_processes => count), &block) end def each(array, options={}, &block) map(array, options.merge(:preserve_results => false), &block) array end def each_with_index(array, options={}, &block) each(array, options.merge(:with_index => true), &block) end def map(array, options = {}, &block) array = array.to_a # turn Range and other Enumerable-s into an Array if RUBY_PLATFORM =~ /java/ and not options[:in_processes] method = :in_threads size = options[method] || processor_count elsif options[:in_threads] method = :in_threads size = options[method] else method = :in_processes if Process.respond_to?(:fork) size = options[method] || processor_count else $stderr.puts "Warning: Process.fork is not supported by this Ruby" size = 0 end end size = [array.size, size].min if size == 0 work_direct(array, options, &block) elsif method == :in_threads work_in_threads(array, options.merge(:count => size), &block) else work_in_processes(array, options.merge(:count => size), &block) end end def map_with_index(array, options={}, &block) map(array, options.merge(:with_index => true), &block) end # Number of processors seen by the OS and used for process scheduling. # # * AIX: /usr/sbin/pmcycles (AIX 5+), /usr/sbin/lsdev # * BSD: /sbin/sysctl # * Cygwin: /proc/cpuinfo # * Darwin: /usr/bin/hwprefs, /usr/sbin/sysctl # * HP-UX: /usr/sbin/ioscan # * IRIX: /usr/sbin/sysconf # * Linux: /proc/cpuinfo # * Minix 3+: /proc/cpuinfo # * Solaris: /usr/sbin/psrinfo # * Tru64 UNIX: /usr/sbin/psrinfo # * UnixWare: /usr/sbin/psrinfo # def processor_count @processor_count ||= begin os_name = RbConfig::CONFIG["target_os"] if os_name =~ /mingw|mswin/ require 'win32ole' result = WIN32OLE.connect("winmgmts://").ExecQuery( "select NumberOfLogicalProcessors from Win32_Processor") result.to_enum.collect(&:NumberOfLogicalProcessors).reduce(:+) elsif File.readable?("/proc/cpuinfo") IO.read("/proc/cpuinfo").scan(/^processor/).size elsif File.executable?("/usr/bin/hwprefs") IO.popen("/usr/bin/hwprefs thread_count").read.to_i elsif File.executable?("/usr/sbin/psrinfo") IO.popen("/usr/sbin/psrinfo").read.scan(/^.*on-*line/).size elsif File.executable?("/usr/sbin/ioscan") IO.popen("/usr/sbin/ioscan -kC processor") do |out| out.read.scan(/^.*processor/).size end elsif File.executable?("/usr/sbin/pmcycles") IO.popen("/usr/sbin/pmcycles -m").read.count("\n") elsif File.executable?("/usr/sbin/lsdev") IO.popen("/usr/sbin/lsdev -Cc processor -S 1").read.count("\n") elsif File.executable?("/usr/sbin/sysconf") and os_name =~ /irix/i IO.popen("/usr/sbin/sysconf NPROC_ONLN").read.to_i elsif File.executable?("/usr/sbin/sysctl") IO.popen("/usr/sbin/sysctl -n hw.ncpu").read.to_i elsif File.executable?("/sbin/sysctl") IO.popen("/sbin/sysctl -n hw.ncpu").read.to_i else $stderr.puts "Unknown platform: " + RbConfig::CONFIG["target_os"] $stderr.puts "Assuming 1 processor." 1 end end end # Number of physical processor cores on the current system. # def physical_processor_count @physical_processor_count ||= begin ppc = case RbConfig::CONFIG["target_os"] when /darwin1/ IO.popen("/usr/sbin/sysctl -n hw.physicalcpu").read.to_i when /linux/ cores = {} # unique physical ID / core ID combinations phy = 0 IO.read("/proc/cpuinfo").scan(/^physical id.*|^core id.*/) do |ln| if ln.start_with?("physical") phy = ln[/\d+/] elsif ln.start_with?("core") cid = phy + ":" + ln[/\d+/] cores[cid] = true if not cores[cid] end end cores.count when /mswin|mingw/ require 'win32ole' result_set = WIN32OLE.connect("winmgmts://").ExecQuery( "select NumberOfCores from Win32_Processor") result_set.to_enum.collect(&:NumberOfCores).reduce(:+) else processor_count end # fall back to logical count if physical info is invalid ppc > 0 ? ppc : processor_count end end private def work_direct(array, options) results = [] array.each_with_index do |e,i| results << (options[:with_index] ? yield(e,i) : yield(e)) end results end def work_in_threads(items, options, &block) results = [] current = -1 exception = nil in_threads(options[:count]) do # as long as there are more items, work on one of them loop do break if exception index = Thread.exclusive{ current+=1 } break if index >= items.size with_instrumentation items[index], index, options do begin results[index] = call_with_index(items, index, options, &block) rescue Exception => e exception = e break end end end end handle_exception(exception, results) end def work_in_processes(items, options, &blk) workers = create_workers(items, options, &blk) current_index = -1 results = [] exception = nil kill_on_ctrl_c(workers.map(&:pid)) do in_threads(options[:count]) do |i| worker = workers[i] begin loop do break if exception index = Thread.exclusive{ current_index += 1 } break if index >= items.size output = with_instrumentation items[index], index, options do worker.work(index) end if ExceptionWrapper === output exception = output.exception else results[index] = output end end ensure worker.close_pipes worker.wait # if it goes zombie, rather wait here to be able to debug end end end handle_exception(exception, results) end def create_workers(items, options, &block) workers = [] Array.new(options[:count]).each do workers << worker(items, options.merge(:started_workers => workers), &block) end workers end def worker(items, options, &block) # use less memory on REE GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=) child_read, parent_write = IO.pipe parent_read, child_write = IO.pipe pid = Process.fork do begin options.delete(:started_workers).each(&:close_pipes) parent_write.close parent_read.close process_incoming_jobs(child_read, child_write, items, options, &block) ensure child_read.close child_write.close end end child_read.close child_write.close Worker.new(parent_read, parent_write, pid) end def process_incoming_jobs(read, write, items, options, &block) while !read.eof? index = Marshal.load(read) result = begin call_with_index(items, index, options, &block) rescue Exception => e ExceptionWrapper.new(e) end Marshal.dump(result, write) end end def wait_for_threads(threads) threads.compact.each do |t| begin t.join rescue Interrupt # thread died, do not stop other threads end end end def handle_exception(exception, results) return nil if exception.class == Parallel::Break raise exception if exception results end # options is either a Integer or a Hash with :count def extract_count_from_options(options) if options.is_a?(Hash) count = options[:count] else count = options options = {} end [count, options] end # kill all these pids or threads if user presses Ctrl+c def kill_on_ctrl_c(things) if defined?(@to_be_killed) && @to_be_killed @to_be_killed << things else @to_be_killed = [things] Signal.trap :SIGINT do if @to_be_killed.any? $stderr.puts 'Parallel execution interrupted, exiting ...' @to_be_killed.flatten.compact.each { |thing| kill_that_thing!(thing) } end exit 1 # Quit with 'failed' signal end end yield ensure @to_be_killed.pop # free threads for GC and do not kill pids that could be used for new processes end def kill_that_thing!(thing) if thing.is_a?(Thread) thing.kill else begin Process.kill(:KILL, thing) rescue Errno::ESRCH # some linux systems already automatically killed the children at this point # so we just ignore them not being there end end end def call_with_index(array, index, options, &block) args = [array[index]] args << index if options[:with_index] if options[:preserve_results] == false block.call(*args) nil # avoid GC overhead of passing large results around else block.call(*args) end end def with_instrumentation(item, index, options) on_start = options[:start] on_finish = options[:finish] on_start.call(item, index) if on_start yield ensure on_finish.call(item, index) if on_finish end end end