lib/ztk/parallel.rb in ztk-1.16.2 vs lib/ztk/parallel.rb in ztk-1.17.0
- old
+ new
@@ -94,10 +94,24 @@
# => nil
#
# @author Zachary Patten <zachary AT jovelabs DOT com>
class Parallel < ZTK::Base
+ class Break < ParallelError; end
+
+ # Tests if we can marshal an exception via the results; otherwise creates
+ # an exception we can marshal.
+ class ExceptionWrapper
+ attr_reader :exception
+
+ def initialize(exception)
+ dumpable = (Marshal.dump(exception) rescue nil)
+ dumpable.nil? and (exception = RuntimeError.new(exception.inspect))
+ @exception = exception
+ end
+ end
+
# Default Maximum Number of Forks
MAX_FORKS = case RUBY_PLATFORM
when /darwin/ then
%x( sysctl hw.ncpu ).strip.split(':').last.strip.to_i
when /linux/ then
@@ -127,10 +141,19 @@
(config.max_forks < 1) and log_and_raise(ParallelError, "max_forks must be equal to or greater than one!")
@forks = Array.new
@results = Array.new
GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true
+
+ %w( kill term int hup ).map(&:upcase).each do |signal|
+ Signal.trap(signal) do
+ $stderr.puts("SIG#{signal} received by PID##{Process.pid}; aborting parallel executing...")
+
+ signal_all(signal)
+ exit!(1)
+ end
+ end
end
# Process in parallel.
#
# @yield Block should execute tasks to be performed in parallel.
@@ -149,22 +172,31 @@
child_reader, parent_writer = IO.pipe
parent_reader, child_writer = IO.pipe
config.before_fork and config.before_fork.call(Process.pid)
pid = Process.fork do
+
config.after_fork and config.after_fork.call(Process.pid)
parent_writer.close
parent_reader.close
- if !(data = block.call).nil?
+ data = nil
+ begin
+ data = block.call
+ rescue Exception => e
+ data = ExceptionWrapper.new(e)
+ end
+
+ if !data.nil?
config.ui.logger.debug { "write(#{data.inspect})" }
child_writer.write(Base64.encode64(Marshal.dump(data)))
end
child_reader.close
child_writer.close
+
Process.exit!(0)
end
config.after_fork and config.after_fork.call(Process.pid)
child_reader.close
@@ -186,14 +218,18 @@
# is returned.
def wait
config.ui.logger.debug { "wait" }
config.ui.logger.debug { "forks(#{@forks.inspect})" }
pid, status = (Process.wait2(-1) rescue nil)
+
if !pid.nil? && !status.nil? && !(fork = @forks.select{ |f| f[:pid] == pid }.first).nil?
data = (Marshal.load(Base64.decode64(fork[:reader].read.to_s)) rescue nil)
config.ui.logger.debug { "read(#{data.inspect})" }
+
+ process_exception_data(data)
!data.nil? and @results.push(data)
+
fork[:reader].close
fork[:writer].close
@forks -= [fork]
return [pid, status, data]
@@ -210,15 +246,45 @@
self.wait
end
@results
end
+ # Signals all forks.
+ #
+ # @return [Integer] The number of processes signaled.
+ def signal_all(signal="KILL")
+ signaled = 0
+ if (!@forks.nil? && (@forks.count > 0))
+ @forks.each do |fork|
+ begin
+ Process.kill(signal, fork[:pid])
+ signaled += 1
+ rescue
+ nil
+ end
+ end
+ end
+ signaled
+ end
+
# Count of active forks.
#
# @return [Integer] Current number of active forks.
def count
config.ui.logger.debug { "count(#{@forks.count})" }
@forks.count
+ end
+
+
+ private
+
+ def process_exception_data(data)
+ return if !(ZTK::Parallel::ExceptionWrapper === data)
+
+ config.ui.logger.fatal { "exception(#{data.exception.inspect})" }
+
+ signal_all
+ raise data.exception
end
end
end