################################################################################ # # Author: Zachary Patten <zachary@jovelabs.com> # Copyright: Copyright (c) Jove Labs # License: Apache License, Version 2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ################################################################################ module ZTK class Parallel ################################################################################ attr_accessor :results ################################################################################ def initialize(options={}) GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=) options.reverse_merge!( :max_forks => `grep -c processor /proc/cpuinfo`.strip.to_i, :one_shot => false ) @max_forks = options[:max_forks] @one_shot = options[:one_shot] @forks = Array.new @results = Array.new end ################################################################################ def process pid = nil return pid if (@forks.count >= @max_forks) child_reader, parent_writer = IO.pipe parent_reader, child_writer = IO.pipe ActiveRecord::Base.connection.disconnect! pid = Process.fork do ActiveRecord::Base.establish_connection parent_writer.close parent_reader.close if (data = yield).present? child_writer.write(Base64.encode64(Marshal.dump(data))) end child_reader.close child_writer.close Process.exit!(0) end ActiveRecord::Base.establish_connection child_reader.close child_writer.close fork = {:reader => parent_reader, :writer => parent_writer, :pid => pid} @forks << fork pid end ################################################################################ def wait pid, status = (Process.wait2(-1, Process::WNOHANG) rescue nil) if pid.present? && status.present? if (fork = @forks.select{ |f| f[:pid] == pid }.first).present? data = (Marshal.load(Base64.decode64(fork[:reader].read.to_s)) rescue nil) @results.push(data) if (data.present? && !@one_shot) fork[:reader].close fork[:writer].close @forks -= [fork] return [pid, status, data] end end nil end ################################################################################ def waitall results = Array.new while @forks.count > 0 results << wait end results end ################################################################################ def count @forks.count end ################################################################################ end end ################################################################################