lib/ztk/parallel.rb in ztk-1.16.2 vs lib/ztk/parallel.rb in ztk-1.17.0

- old
+ new

@@ -94,10 +94,24 @@ # => nil # # @author Zachary Patten <zachary AT jovelabs DOT com> class Parallel < ZTK::Base + class Break < ParallelError; end + + # Tests if we can marshal an exception via the results; otherwise creates + # an exception we can marshal. + class ExceptionWrapper + attr_reader :exception + + def initialize(exception) + dumpable = (Marshal.dump(exception) rescue nil) + dumpable.nil? and (exception = RuntimeError.new(exception.inspect)) + @exception = exception + end + end + # Default Maximum Number of Forks MAX_FORKS = case RUBY_PLATFORM when /darwin/ then %x( sysctl hw.ncpu ).strip.split(':').last.strip.to_i when /linux/ then @@ -127,10 +141,19 @@ (config.max_forks < 1) and log_and_raise(ParallelError, "max_forks must be equal to or greater than one!") @forks = Array.new @results = Array.new GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true + + %w( kill term int hup ).map(&:upcase).each do |signal| + Signal.trap(signal) do + $stderr.puts("SIG#{signal} received by PID##{Process.pid}; aborting parallel executing...") + + signal_all(signal) + exit!(1) + end + end end # Process in parallel. # # @yield Block should execute tasks to be performed in parallel. @@ -149,22 +172,31 @@ child_reader, parent_writer = IO.pipe parent_reader, child_writer = IO.pipe config.before_fork and config.before_fork.call(Process.pid) pid = Process.fork do + config.after_fork and config.after_fork.call(Process.pid) parent_writer.close parent_reader.close - if !(data = block.call).nil? + data = nil + begin + data = block.call + rescue Exception => e + data = ExceptionWrapper.new(e) + end + + if !data.nil? config.ui.logger.debug { "write(#{data.inspect})" } child_writer.write(Base64.encode64(Marshal.dump(data))) end child_reader.close child_writer.close + Process.exit!(0) end config.after_fork and config.after_fork.call(Process.pid) child_reader.close @@ -186,14 +218,18 @@ # is returned. def wait config.ui.logger.debug { "wait" } config.ui.logger.debug { "forks(#{@forks.inspect})" } pid, status = (Process.wait2(-1) rescue nil) + if !pid.nil? && !status.nil? && !(fork = @forks.select{ |f| f[:pid] == pid }.first).nil? data = (Marshal.load(Base64.decode64(fork[:reader].read.to_s)) rescue nil) config.ui.logger.debug { "read(#{data.inspect})" } + + process_exception_data(data) !data.nil? and @results.push(data) + fork[:reader].close fork[:writer].close @forks -= [fork] return [pid, status, data] @@ -210,15 +246,45 @@ self.wait end @results end + # Signals all forks. + # + # @return [Integer] The number of processes signaled. + def signal_all(signal="KILL") + signaled = 0 + if (!@forks.nil? && (@forks.count > 0)) + @forks.each do |fork| + begin + Process.kill(signal, fork[:pid]) + signaled += 1 + rescue + nil + end + end + end + signaled + end + # Count of active forks. # # @return [Integer] Current number of active forks. def count config.ui.logger.debug { "count(#{@forks.count})" } @forks.count + end + + + private + + def process_exception_data(data) + return if !(ZTK::Parallel::ExceptionWrapper === data) + + config.ui.logger.fatal { "exception(#{data.exception.inspect})" } + + signal_all + raise data.exception end end end