Sha256: 1f650c466ada3f1ab5d19e730dd3d956444449d2545ef1801892eb3f7ff30ac6
Contents?: true
Size: 1.73 KB
Versions: 1
Compression:
Stored size: 1.73 KB
Contents
require 'thread' module Enumerable def forkoff options = {}, &block n = Integer( options['processes'] || options[:processes] || 8 ) done = Object.new qs = Array.new(n){ Queue.new } results = Array.new(n){ Queue.new } # # consumers # threads = [] n.times do |i| thread = Thread.new(i) do |i| Thread.current.abort_on_exception = true loop do value = qs[i].pop break if value == done args, index = value r, w = IO.pipe pid = fork unless pid r.close result = begin block.call(*args) rescue Object => e e end w.write( Marshal.dump( result ) ) exit end w.close result = Marshal.load( r.read ) results[i].push( [result, index] ) Process.waitpid pid end results[i].push( done ) end threads << thread end # # producer # each_with_index do |args, i| qs[ i.modulo(qs.size) ].push( [args, i] ) end # # mark the end of each queue # qs.each do |q| q.push done end # # wait for all threads to complete # threads.each do |t| t.value end # # gather results # list = [] results.each do |q| loop do value = q.pop break if value == done result, index = value list[index] = result end end list end alias_method 'forkoff!', 'forkoff' end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
forkoff-0.0.0 | lib/forkoff.rb |