lib/ztk/parallel.rb in ztk-0.0.5 vs lib/ztk/parallel.rb in ztk-0.0.6
- old
+ new
@@ -1,22 +1,22 @@
################################################################################
#
# Author: Zachary Patten <zachary@jovelabs.com>
# Copyright: Copyright (c) Jove Labs
-# License: Apache License, Vers::IOn 2.0
+# License: Apache License, VersIOn 2.0
#
-# Licensed under the Apache License, Vers::IOn 2.0 (the "License");
+# Licensed under the Apache License, VersIOn 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDIT::IONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permiss::IOns and
-# limitat::IOns under the License.
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissIOns and
+# limitatIOns under the License.
#
################################################################################
require "base64"
@@ -24,14 +24,18 @@
module ZTK
################################################################################
- class Parallel < ::ZTK::Base
+ class ParallelError < Error; end
################################################################################
+ class Parallel < ZTK::Base
+
+################################################################################
+
attr_accessor :results
################################################################################
def initialize(config={})
@@ -39,44 +43,46 @@
:max_forks => %x( grep -c processor /proc/cpuinfo ).chomp.to_i,
:before_fork => nil,
:after_fork => nil
}.merge(config))
- @forks = ::Array.new
- @results = ::Array.new
- ::GC.respond_to?(:copy_on_write_friendly=) and ::GC.copy_on_write_friendly = true
+ @forks = Array.new
+ @results = Array.new
+ GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true
end
################################################################################
- def process(*args)
- @config.logger and @config.logger.debug{ "forks(#{@forks.inspect})" }
+ def process(&block)
+ raise ParallelError, "You must supply a block to the process method!" if !block_given?
+ log(:debug) { "forks(#{@forks.inspect})" }
+
while (@forks.count >= @config.max_forks) do
wait
end
- child_reader, parent_writer = ::IO.pipe
- parent_reader, child_writer = ::IO.pipe
+ child_reader, parent_writer = IO.pipe
+ parent_reader, child_writer = IO.pipe
- @config.before_fork and @config.before_fork.call(::Process.pid)
- pid = ::Process.fork do
- @config.after_fork and @config.after_fork.call(::Process.pid)
+ @config.before_fork and @config.before_fork.call(Process.pid)
+ pid = Process.fork do
+ @config.after_fork and @config.after_fork.call(Process.pid)
parent_writer.close
parent_reader.close
- if !(data = yield).nil?
- @config.logger and @config.logger.debug{ "write(#{data.inspect})" }
- child_writer.write(::Base64.encode64(::Marshal.dump(data)))
+ if !(data = block.call).nil?
+ log(:debug) { "write(#{data.inspect})" }
+ child_writer.write(Base64.encode64(Marshal.dump(data)))
end
child_reader.close
child_writer.close
- ::Process.exit!(0)
+ Process.exit!(0)
end
- @config.after_fork and @config.after_fork.call(::Process.pid)
+ @config.after_fork and @config.after_fork.call(Process.pid)
child_reader.close
child_writer.close
fork = {:reader => parent_reader, :writer => parent_writer, :pid => pid}
@@ -86,39 +92,39 @@
end
################################################################################
def wait
- @config.logger and @config.logger.debug{ "forks(#{@forks.inspect})" }
- pid, status = (::Process.wait2(-1, ::Process::WNOHANG) rescue nil)
+ log(:debug) { "forks(#{@forks.inspect})" }
+ pid, status = (Process.wait2(-1) rescue nil)
if !pid.nil? && !status.nil? && !(fork = @forks.select{ |f| f[:pid] == pid }.first).nil?
- data = (::Marshal.load(::Base64.decode64(fork[:reader].read.to_s)) rescue nil)
- @config.logger and @config.logger.debug{ "read(#{data.inspect})" }
+ data = (Marshal.load(Base64.decode64(fork[:reader].read.to_s)) rescue nil)
+ log(:debug) { "read(#{data.inspect})" }
!data.nil? and @results.push(data)
fork[:reader].close
fork[:writer].close
@forks -= [fork]
return [pid, status, data]
end
- sleep(1)
nil
end
################################################################################
def waitall
- data = ::Array.new
+ data = Array.new
while @forks.count > 0
- data << self.wait
+ result = self.wait
+ result and data << result
end
data
end
################################################################################
def count
- @config.logger and @config.logger.debug{ "count(#{@forks.count})" }
+ log(:debug) { "count(#{@forks.count})" }
@forks.count
end
################################################################################