lib/ztk/parallel.rb in ztk-0.0.5 vs lib/ztk/parallel.rb in ztk-0.0.6

- old
+ new

@@ -1,22 +1,22 @@ ################################################################################ # # Author: Zachary Patten <zachary@jovelabs.com> # Copyright: Copyright (c) Jove Labs -# License: Apache License, Vers::IOn 2.0 +# License: Apache License, VersIOn 2.0 # -# Licensed under the Apache License, Vers::IOn 2.0 (the "License"); +# Licensed under the Apache License, VersIOn 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDIT::IONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permiss::IOns and -# limitat::IOns under the License. +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissIOns and +# limitatIOns under the License. # ################################################################################ require "base64" @@ -24,14 +24,18 @@ module ZTK ################################################################################ - class Parallel < ::ZTK::Base + class ParallelError < Error; end ################################################################################ + class Parallel < ZTK::Base + +################################################################################ + attr_accessor :results ################################################################################ def initialize(config={}) @@ -39,44 +43,46 @@ :max_forks => %x( grep -c processor /proc/cpuinfo ).chomp.to_i, :before_fork => nil, :after_fork => nil }.merge(config)) - @forks = ::Array.new - @results = ::Array.new - ::GC.respond_to?(:copy_on_write_friendly=) and ::GC.copy_on_write_friendly = true + @forks = Array.new + @results = Array.new + GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true end ################################################################################ - def process(*args) - @config.logger and @config.logger.debug{ "forks(#{@forks.inspect})" } + def process(&block) + raise ParallelError, "You must supply a block to the process method!" if !block_given? + log(:debug) { "forks(#{@forks.inspect})" } + while (@forks.count >= @config.max_forks) do wait end - child_reader, parent_writer = ::IO.pipe - parent_reader, child_writer = ::IO.pipe + child_reader, parent_writer = IO.pipe + parent_reader, child_writer = IO.pipe - @config.before_fork and @config.before_fork.call(::Process.pid) - pid = ::Process.fork do - @config.after_fork and @config.after_fork.call(::Process.pid) + @config.before_fork and @config.before_fork.call(Process.pid) + pid = Process.fork do + @config.after_fork and @config.after_fork.call(Process.pid) parent_writer.close parent_reader.close - if !(data = yield).nil? - @config.logger and @config.logger.debug{ "write(#{data.inspect})" } - child_writer.write(::Base64.encode64(::Marshal.dump(data))) + if !(data = block.call).nil? + log(:debug) { "write(#{data.inspect})" } + child_writer.write(Base64.encode64(Marshal.dump(data))) end child_reader.close child_writer.close - ::Process.exit!(0) + Process.exit!(0) end - @config.after_fork and @config.after_fork.call(::Process.pid) + @config.after_fork and @config.after_fork.call(Process.pid) child_reader.close child_writer.close fork = {:reader => parent_reader, :writer => parent_writer, :pid => pid} @@ -86,39 +92,39 @@ end ################################################################################ def wait - @config.logger and @config.logger.debug{ "forks(#{@forks.inspect})" } - pid, status = (::Process.wait2(-1, ::Process::WNOHANG) rescue nil) + log(:debug) { "forks(#{@forks.inspect})" } + pid, status = (Process.wait2(-1) rescue nil) if !pid.nil? && !status.nil? && !(fork = @forks.select{ |f| f[:pid] == pid }.first).nil? - data = (::Marshal.load(::Base64.decode64(fork[:reader].read.to_s)) rescue nil) - @config.logger and @config.logger.debug{ "read(#{data.inspect})" } + data = (Marshal.load(Base64.decode64(fork[:reader].read.to_s)) rescue nil) + log(:debug) { "read(#{data.inspect})" } !data.nil? and @results.push(data) fork[:reader].close fork[:writer].close @forks -= [fork] return [pid, status, data] end - sleep(1) nil end ################################################################################ def waitall - data = ::Array.new + data = Array.new while @forks.count > 0 - data << self.wait + result = self.wait + result and data << result end data end ################################################################################ def count - @config.logger and @config.logger.debug{ "count(#{@forks.count})" } + log(:debug) { "count(#{@forks.count})" } @forks.count end ################################################################################