Sha256: 1f650c466ada3f1ab5d19e730dd3d956444449d2545ef1801892eb3f7ff30ac6

Contents?: true

Size: 1.73 KB

Versions: 1

Compression:

Stored size: 1.73 KB

Contents

require 'thread'

module Enumerable 

  def forkoff options = {}, &block
    n = Integer( options['processes'] || options[:processes] || 8 )
    done = Object.new
    qs = Array.new(n){ Queue.new }
    results = Array.new(n){ Queue.new }

    #
    # consumers
    #
      threads = []

      n.times do |i|
        thread =
          Thread.new(i) do |i|
            Thread.current.abort_on_exception = true

            loop do
              value = qs[i].pop
              break if value == done 
              args, index = value

              r, w = IO.pipe
              pid = fork

              unless pid
                r.close
                result =
                  begin
                    block.call(*args)
                  rescue Object => e
                    e
                  end
                w.write( Marshal.dump( result ) )
                exit
              end

              w.close
              result = Marshal.load( r.read )
              results[i].push( [result, index] )
              Process.waitpid pid
            end

            results[i].push( done )
          end

        threads << thread
      end

    #
    # producer
    #
      each_with_index do |args, i|
        qs[ i.modulo(qs.size) ].push( [args, i] )
      end

    #
    # mark the end of each queue
    #
      qs.each do |q|
        q.push done
      end

    #
    # wait for all threads to complete
    #
      threads.each do |t|
        t.value
      end

    #
    # gather results
    #
      list = []

      results.each do |q|
        loop do
          value = q.pop
          break if value == done
          result, index = value
          list[index] = result
        end
      end

      list
  end

  alias_method 'forkoff!', 'forkoff'
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
forkoff-0.0.0 lib/forkoff.rb