# Copyright (c) 2007 Samuel Williams. Released under the GNU GPLv3. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . require 'thread' module RExec RD = 0 WR = 1 # This function closes all IO other than $stdin, $stdout, $stderr def self.close_io(except = [$stdin, $stdout, $stderr]) # Make sure all file descriptors are closed ObjectSpace.each_object(IO) do |io| unless except.include?(io) io.close rescue nil end end end class Task private def self.pipes_for_options(options) pipes = [[nil, nil], [nil, nil], [nil, nil]] if options[:passthrough] passthrough = options[:passthrough] if passthrough == :all passthrough = [:in, :out, :err] elsif passthrough.kind_of?(Symbol) passthrough = [passthrough] end passthrough.each do |name| case(name) when :in options[:in] = $stdin when :out options[:out] = $stdout when :err options[:err] = $stderr end end end modes = [RD, WR, WR] {:in => 0, :out => 1, :err => 2}.each do |name, idx| m = modes[idx] p = options[name] if p.kind_of?(IO) pipes[idx][m] = p elsif p.kind_of?(Array) and p.size == 2 pipes[idx] = p else pipes[idx] = IO.pipe end end return pipes end # Close all the supplied pipes STDPIPES = [STDIN, STDOUT, STDERR] def self.close_pipes(*pipes) pipes = pipes.compact.reject{|pipe| STDPIPES.include?(pipe)} pipes.each do |pipe| pipe.close unless pipe.closed? end end # Dump any remaining data from the pipes, until they are closed. def self.dump_pipes(*pipes) pipes = pipes.compact.reject{|pipe| STDPIPES.include?(pipe)} pipes.delete_if { |pipe| pipe.closed? } # Dump any output that was not consumed (errors, etc) while pipes.size > 0 result = IO.select(pipes) result[0].each do |pipe| if pipe.closed? || pipe.eof? pipes.delete(pipe) next end $stderr.puts pipe.readline.chomp end end end public # Returns true if the given pid is a current process def self.running?(pid) gpid = Process.getpgid(pid) rescue nil return gpid != nil ? true : false end # Very simple method to spawn a child daemon. A daemon is detatched from the controlling tty, and thus is # not killed when the parent process finishes. # # spawn_daemon do # Dir.chdir("/") # File.umask 0000 # puts "Hello from daemon!" # sleep(600) # puts "This code will not quit when parent process finishes..." # puts "...but $stdout might be closed unless you set it to a file." # end # def self.spawn_daemon(&block) pid_pipe = IO.pipe fork do Process.setsid exit if fork # Send the pid back to the parent pid_pipe[RD].close pid_pipe[WR].write(Process.pid.to_s) pid_pipe[WR].close yield exit(0) end pid_pipe[WR].close pid = pid_pipe[RD].read pid_pipe[RD].close return pid.to_i end # Very simple method to spawn a child process # # spawn_child do # puts "Hello from child!" # end # def self.spawn_child(&block) pid = fork do yield exit!(0) end return pid end # Open a process. Similar to IO.popen, but provides a much more generic interface to stdin, stdout, # stderr and the pid. We also attempt to tidy up as much as possible given some kind of error or # exception. You are expected to write to output, and read from input and error. # # = Options = # # We can specify a pipe that will be redirected to the current processes pipe. A typical one is # :err, so that errors in the child process are printed directly to $stderr of the parent process. # :passthrough => :err # :passthrough => [:in, :out, :err] or :passthrough => :all # # We can specify a set of pipes other than the standard ones for redirecting to other things, eg # :out => File.open("output.log", "a") # # If you need to supply a pipe manually, you can do that too: # :in => IO.pipe # # You can specify :daemonize => true to cause the child process to detatch. In this # case you will generally want to specify files for :in, :out, :err e.g. # # :in => File.open("/dev/null"), # :out => File.open("/var/log/my.log", "a"), # :err => File.open("/var/log/my.err", "a") # def self.open(command, options = {}, &block) cin, cout, cerr = pipes_for_options(options) spawn = options[:daemonize] ? :spawn_daemon : :spawn_child cid = self.send(spawn) do close_pipes(cin[WR], cout[RD], cerr[RD]) STDIN.reopen(cin[RD]) if cin[RD] STDOUT.reopen(cout[WR]) if cout[WR] STDERR.reopen(cerr[WR]) if cerr[WR] if command.respond_to? :call command.call elsif Array === command # If command is a Pathname, we need to convert it to an absolute path if possible, # otherwise if it is relative it might cause problems. if command[0].respond_to? :realpath command[0] = command[0].realpath end exec *command else if command.respond_to? :realpath command = command.realpath end exec command.to_s end end close_pipes(cin[RD], cout[WR], cerr[WR]) task = Task.new(cin[WR], cout[RD], cerr[RD], cid) if block_given? begin yield task task.close_input return task.wait ensure task.close end else return task end end def initialize(input, output, error, pid) @input = input @output = output @error = error @pid = pid @result = nil @status = :running @result_lock = Mutex.new @result_available = ConditionVariable.new end attr :input attr :output attr :error attr :pid attr :result # Returns true if the current task is still running def running? if self.class.running?(@pid) # The pid still seems alive, check that it isn't some other process using the same pid... @result_lock.synchronize do # If we haven't waited for it yet, it must be either a running process or a zombie... return @status != :stopped end end return false end # Close all connections to the child process def close begin self.class.dump_pipes(@output, @error) ensure self.class.close_pipes(@input, @output, @error) end end # Close input pipe to child process (if applicable) def close_input @input.close if @input and !@input.closed? end # Send a signal to the child process def kill(signal = "INT") if running? Process.kill(signal, @pid) else raise Errno::ECHILD end end # Wait for the child process to finish, return the exit status. # This function can be called from multiple threads. def wait begin_wait = false # Check to see if some other caller is already waiting on the result... @result_lock.synchronize do case @status when :waiting # If so, wait for the wait to finish... @result_available.wait(@result_lock) when :running # Else, mark that we should begin waiting... begin_wait = true @status = :waiting when :stopped return @result end end # If we should begin waiting (the first thread to wait)... if begin_wait begin # Wait for the result... _pid, @result = Process.wait2(@pid) end # The result is now available... @result_lock.synchronize do @status = :stopped end # Notify other threads... @result_available.broadcast() end # Return the result return @result end # Forcefully stop the child process. def stop if running? close_input kill end end end end