# $Id: spawner.rb 49 2007-11-29 00:57:15Z tim_pease $ require 'rbconfig' require 'thread' require 'tempfile' # == Synopsis # # A class for spawning child processes and ensuring those children continue # running. # # == Details # # When a spawner is created it is given the command to run in a child # process. This child process has +stdin+, +stdout+, and +stderr+ redirected # to +/dev/null+ (this works even on Windows). When the child dies for any # reason, the spawner will restart a new child process in the exact same # manner as the original. # class Spawner @dev_null = test(?e, "/dev/null") ? "/dev/null" : "NUL:" c = ::Config::CONFIG ruby = File.join(c['bindir'], c['ruby_install_name']) << c['EXEEXT'] @ruby = if system('%s -e exit' % ruby) then ruby elsif system('ruby -e exit') then 'ruby' else warn 'no ruby in PATH/CONFIG' end class << self attr_reader :ruby attr_reader :dev_null def finalizer( cids ) pid = $$ lambda do break unless pid == $$ cids.kill 'TERM', :all end # lambda end # finalizer end # call-seq: # Spawner.new( command, *args, opts = {} ) # # Creates a new spawner that will execute the given external _command_ in # a sub-process. The calling semantics of Kernel::exec are # used to execute the _command_. Any number of optional _args_ can be # passed to the _command_. # # Available options: # # :spawn => the number of child processes to spawn # :pause => wait time (in seconds) before respawning after termination # :ruby => the Ruby interpreter to use when spawning children # :env => a hash for the child process environment # :stdin => stdin child processes will read from # :stdout => stdout child processes will write to # :stderr => stderr child processes will write to # # The :env option is used to add environemnt variables to # child processes when they are spawned. # # *Note:* all spawned child processes will use the same stdin, stdout, and # stderr if they are given in the options. Otherwise they all default to # /dev/null on *NIX and NUL: on Windows. # def initialize( *args ) config = { :ruby => self.class.ruby, :spawn => 1, :pause => 0, :stdin => self.class.dev_null, :stdout => self.class.dev_null, :stderr => self.class.dev_null } config.merge! args.pop if Hash === args.last config[:argv] = args raise ArgumentError, 'wrong number of arguments' if args.empty? @stop = true @cids = [] @group = ThreadGroup.new @spawn = config.delete(:spawn) @pause = config.delete(:pause) @ruby = config.delete(:ruby) @tmp = child_program(config) class << @cids # call-seq: # sync {block} # # Executes the given block in a synchronized fashion -- i.e. only a # single thread can execute at a time. Uses Mutex under the hood. # def sync(&b) @mutex ||= Mutex.new @mutex.synchronize(&b) end # call-seq: # kill( signal, num ) => number killed # kill( signal, :all ) => number killed # # Send the _signal_ to a given _num_ of child processes or all child # processes if :all is given instead of a number. Returns # the number of child processes killed. # def kill( signal, arg ) return if empty? ary = sync do case arg when :all; self.dup when Integer; self.slice(0,arg) else raise ArgumentError end end ary.each do |cid| begin Process.kill(signal, cid) rescue SystemCallError sync {delete cid} end end ary.length end # def kill end # class << @cids end # def initialize attr_reader :spawn attr_accessor :pause # call-seq: # spawner.spawn = num # # Set the number of child processes to spawn. If the new spawn number is # less than the current number, then spawner threads will die # def spawn=( num ) num = num.abs diff, @spawn = num - @spawn, num return unless running? if diff > 0 diff.times {_spawn} elsif diff < 0 @cids.kill 'TERM', diff.abs end end # call-seq: # start => self # # Spawn the sub-processes. # def start return self if running? @stop = false @cleanup = Spawner.finalizer(@cids) ObjectSpace.define_finalizer(self, @cleanup) @spawn.times {_spawn} self end # call-seq: # stop( timeout = 5 ) => self # # Stop any spawned sub-processes. # def stop( timeout = 5 ) return self unless running? @stop = true @cleanup.call ObjectSpace.undefine_finalizer(self) # the cleanup call sends SIGTERM to all the child processes # however, some might still be hanging around, so we are going to wait # for a timeout interval and then send a SIGKILL to any remaining child # processes nap_time = 0.05 * timeout # sleep for 5% of the timeout interval timeout = Time.now + timeout until @cids.empty? sleep nap_time unless Time.now < timeout @cids.kill 'KILL', :all @cids.clear @group.list.each {|t| t.kill} break end end self end # call-seq: # restart( timeout = 5 ) # def restart( timeout = 5 ) stop( timeout ) start end # call-seq: # running? # # Returns +true+ if the spawner is currently running; returns +false+ # otherwise. # def running? !@stop end # call-seq: # join( timeout = nil ) => spawner or nil # # The calling thread will suspend execution until all child processes have # been stopped. Does not return until all spawner threads have exited (the # child processes have been stopped) or until _timeout seconds have # passed. If the timeout expires +nil+ will be returned; otherwise the # spawner is returned. # def join( limit = nil ) loop do t = @group.list.first break if t.nil? return nil unless t.join(limit) end self end private # call-seq: # _spawn => thread # # Creates a thread that will spawn the sub-process via # IO::popen. If the sub-process terminates, it will be # respawned until the +stop+ message is sent to this spawner. # # If an Exception is encountered during the spawning process, a message # will be printed to stderr and the thread will exit. # def _spawn t = Thread.new do catch(:die) do loop do begin io = IO.popen("#{@ruby} #{@tmp.path}", 'r') cid = io.gets.to_i @cids.sync {@cids << cid} if cid > 0 Process.wait cid rescue Exception => e STDERR.puts e.inspect STDERR.puts e.backtrace.join("\n") throw :die ensure io.close rescue nil @cids.sync { @cids.delete cid throw :die unless @cids.length < @spawn } end throw :die if @stop sleep @pause end # loop end # catch(:die) end # Thread.new @group.add t t end # call-seq: # child_program( config ) => tempfile # # Creates a child Ruby program based on the given _config_ hash. The # following hash keys are used: # # :argv => command and arguments passed to Kernel::exec # :env => environment variables for the child process # :cwd => the current working directory to use for the child process # :stdin => stdin the child process will read from # :stdout => stdout the child process will write to # :stderr => stderr the child process will write to # def child_program( config ) config = Marshal.dump(config) tmp = Tempfile.new(self.class.name.downcase) tmp.write <<-PROG begin config = Marshal.load(#{config.inspect}) argv = config[:argv] env = config[:env] cwd = config[:cwd] stdin = config[:stdin] stdout = config[:stdout] stderr = config[:stderr] Dir.chdir cwd if cwd env.each {|k,v| ENV[k.to_s] = v.to_s} if env rescue Exception => e STDERR.warn e abort end STDOUT.puts Process.pid STDOUT.flush STDIN.reopen stdin STDOUT.reopen stdout STDERR.reopen stderr exec *argv PROG tmp.close tmp end end # class Spawner # EOF