lib/rexec/task.rb in rexec-1.1.12 vs lib/rexec/task.rb in rexec-1.2.1
- old
+ new
@@ -11,315 +11,329 @@
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-class String
- # Helper for turning a string into a shell argument
- def to_arg
- match(/\s/) ? dump : self
- end
-
- def to_cmd
- return self
- end
-end
+require 'thread'
-class Array
- # Helper for turning an array of items into a command line string
- # <tt>["ls", "-la", "/My Path"].to_cmd => "ls -la \"/My Path\""</tt>
- def to_cmd
- collect{ |a| a.to_arg }.join(" ")
- end
-end
+module RExec
+ RD = 0
+ WR = 1
-class Pathname
- # Helper for turning a pathname into a command line string
- def to_cmd
- to_s
- end
-end
+ # This function closes all IO other than $stdin, $stdout, $stderr
+ def self.close_io(except = [$stdin, $stdout, $stderr])
+ # Make sure all file descriptors are closed
+ ObjectSpace.each_object(IO) do |io|
+ unless except.include?(io)
+ io.close rescue nil
+ end
+ end
+ end
-module RExec
- RD = 0
- WR = 1
+ class Task
+ private
+ def self.pipes_for_options(options)
+ pipes = [[nil, nil], [nil, nil], [nil, nil]]
- # This function closes all IO other than $stdin, $stdout, $stderr
- def self.close_io(except = [$stdin, $stdout, $stderr])
- # Make sure all file descriptors are closed
- ObjectSpace.each_object(IO) do |io|
- unless except.include?(io)
- io.close rescue nil
- end
- end
- end
+ if options[:passthrough]
+ passthrough = options[:passthrough]
- class Task
- private
- def self.pipes_for_options(options)
- pipes = [[nil, nil], [nil, nil], [nil, nil]]
+ if passthrough == :all
+ passthrough = [:in, :out, :err]
+ elsif passthrough.kind_of?(Symbol)
+ passthrough = [passthrough]
+ end
- if options[:passthrough]
- passthrough = options[:passthrough]
-
- if passthrough == :all
- passthrough = [:in, :out, :err]
- elsif passthrough.kind_of?(Symbol)
- passthrough = [passthrough]
- end
-
- passthrough.each do |name|
- case(name)
- when :in
- options[:in] = $stdin
- when :out
- options[:out] = $stdout
- when :err
- options[:err] = $stderr
- end
- end
- end
-
- modes = [RD, WR, WR]
- {:in => 0, :out => 1, :err => 2}.each do |name, idx|
- m = modes[idx]
- p = options[name]
-
- if p.kind_of?(IO)
- pipes[idx][m] = p
- elsif p.kind_of?(Array) and p.size == 2
- pipes[idx] = p
- else
- pipes[idx] = IO.pipe
- end
- end
+ passthrough.each do |name|
+ case(name)
+ when :in
+ options[:in] = $stdin
+ when :out
+ options[:out] = $stdout
+ when :err
+ options[:err] = $stderr
+ end
+ end
+ end
- return pipes
- end
-
- # Close all the supplied pipes
- def close_pipes(*pipes)
- pipes.compact!
+ modes = [RD, WR, WR]
+ {:in => 0, :out => 1, :err => 2}.each do |name, idx|
+ m = modes[idx]
+ p = options[name]
- pipes.each do |pipe|
- pipe.close unless pipe.closed?
- end
- end
+ if p.kind_of?(IO)
+ pipes[idx][m] = p
+ elsif p.kind_of?(Array) and p.size == 2
+ pipes[idx] = p
+ else
+ pipes[idx] = IO.pipe
+ end
+ end
- # Dump any remaining data from the pipes, until they are closed.
- def dump_pipes(*pipes)
- pipes.compact!
+ return pipes
+ end
- pipes.delete_if { |pipe| pipe.closed? }
- # Dump any output that was not consumed (errors, etc)
- while pipes.size > 0
- result = IO.select(pipes)
+ # Close all the supplied pipes
+ STDPIPES = [STDIN, STDOUT, STDERR]
+ def self.close_pipes(*pipes)
+ pipes = pipes.compact.reject{|pipe| STDPIPES.include?(pipe)}
- result[0].each do |pipe|
- if pipe.closed? || pipe.eof?
- pipes.delete(pipe)
- next
- end
+ pipes.each do |pipe|
+ pipe.close unless pipe.closed?
+ end
+ end
- $stderr.puts pipe.readline.chomp
- end
- end
- end
-
- public
- # Returns true if the given pid is a current process
- def self.running?(pid)
- gpid = Process.getpgid(pid) rescue nil
+ # Dump any remaining data from the pipes, until they are closed.
+ def self.dump_pipes(*pipes)
+ pipes = pipes.compact.reject{|pipe| STDPIPES.include?(pipe)}
- return gpid != nil ? true : false
- end
-
- # Very simple method to spawn a child daemon. A daemon is detatched from the controlling tty, and thus is
- # not killed when the parent process finishes.
- # <tt>
- # spawn_daemon do
- # Dir.chdir("/")
- # File.umask 0000
- # puts "Hello from daemon!"
- # sleep(600)
- # puts "This code will not quit when parent process finishes..."
- # puts "...but $stdout might be closed unless you set it to a file."
- # end
- # </tt>
- def self.spawn_daemon(&block)
- pid_pipe = IO.pipe
+ pipes.delete_if { |pipe| pipe.closed? }
+ # Dump any output that was not consumed (errors, etc)
+ while pipes.size > 0
+ result = IO.select(pipes)
- fork do
- Process.setsid
- exit if fork
+ result[0].each do |pipe|
+ if pipe.closed? || pipe.eof?
+ pipes.delete(pipe)
+ next
+ end
- # Send the pid back to the parent
- pid_pipe[RD].close
- pid_pipe[WR].write(Process.pid.to_s)
- pid_pipe[WR].close
+ $stderr.puts pipe.readline.chomp
+ end
+ end
+ end
- yield
+ public
+ # Returns true if the given pid is a current process
+ def self.running?(pid)
+ gpid = Process.getpgid(pid) rescue nil
- exit(0)
- end
+ return gpid != nil ? true : false
+ end
- pid_pipe[WR].close
- pid = pid_pipe[RD].read
- pid_pipe[RD].close
+ # Very simple method to spawn a child daemon. A daemon is detatched from the controlling tty, and thus is
+ # not killed when the parent process finishes.
+ # <tt>
+ # spawn_daemon do
+ # Dir.chdir("/")
+ # File.umask 0000
+ # puts "Hello from daemon!"
+ # sleep(600)
+ # puts "This code will not quit when parent process finishes..."
+ # puts "...but $stdout might be closed unless you set it to a file."
+ # end
+ # </tt>
+ def self.spawn_daemon(&block)
+ pid_pipe = IO.pipe
- return pid.to_i
- end
-
- # Very simple method to spawn a child process
- # <tt>
- # spawn_child do
- # puts "Hello from child!"
- # end
- # </tt>
- def self.spawn_child(&block)
- pid = fork do
- yield
+ fork do
+ Process.setsid
+ exit if fork
- exit!(0)
- end
+ # Send the pid back to the parent
+ pid_pipe[RD].close
+ pid_pipe[WR].write(Process.pid.to_s)
+ pid_pipe[WR].close
- return pid
- end
-
- # Open a process. Similar to IO.popen, but provides a much more generic interface to stdin, stdout,
- # stderr and the pid. We also attempt to tidy up as much as possible given some kind of error or
- # exception. You are expected to write to output, and read from input and error.
- #
- # = Options =
- #
- # We can specify a pipe that will be redirected to the current processes pipe. A typical one is
- # :err, so that errors in the child process are printed directly to $stderr of the parent process.
- # <tt>:passthrough => :err</tt>
- # <tt>:passthrough => [:in, :out, :err]</tt> or <tt>:passthrough => :all</tt>
- #
- # We can specify a set of pipes other than the standard ones for redirecting to other things, eg
- # <tt>:out => File.open("output.log", "a")</tt>
- #
- # If you need to supply a pipe manually, you can do that too:
- # <tt>:in => IO.pipe</tt>
- #
- # You can specify <tt>:daemon => true</tt> to cause the child process to detatch. In this
- # case you will generally want to specify files for <tt>:in, :out, :err</tt> e.g.
- # <tt>
- # :in => File.open("/dev/null"),
- # :out => File.open("/var/log/my.log", "a"),
- # :err => File.open("/var/log/my.err", "a")
- # </tt>
- def self.open(command, options = {}, &block)
- cin, cout, cerr = pipes_for_options(options)
- stdpipes = [STDIN, STDOUT, STDERR]
+ yield
- spawn = options[:daemonize] ? :spawn_daemon : :spawn_child
+ exit(0)
+ end
- cid = self.send(spawn) do
- [cin[WR], cout[RD], cerr[RD]].compact.each { |pipe| pipe.close }
-
- STDIN.reopen(cin[RD]) if cin[RD] and !stdpipes.include?(cin[RD])
- STDOUT.reopen(cout[WR]) if cout[WR] and !stdpipes.include?(cout[WR])
- STDERR.reopen(cerr[WR]) if cerr[WR] and !stdpipes.include?(cerr[WR])
-
- if command.respond_to? :call
- command.call
- else
- # If command is a Pathname, we need to convert it to an absolute path if possible,
- # otherwise if it is relative it might cause problems.
- if command.respond_to? :realpath
- command = command.realpath
- end
-
- if command.respond_to? :to_cmd
- exec(command.to_cmd)
- else
- exec(command.to_s)
- end
- end
- end
-
- # Don't close stdin, stdout, stderr.
- [cin[RD], cout[WR], cerr[WR]].compact.each { |pipe| pipe.close unless stdpipes.include?(pipe) }
+ pid_pipe[WR].close
+ pid = pid_pipe[RD].read
+ pid_pipe[RD].close
- task = Task.new(cin[WR], cout[RD], cerr[RD], cid)
+ return pid.to_i
+ end
- if block_given?
- begin
- yield task
- task.close_input
- return task.wait
- ensure
- task.stop
- end
- else
- return task
- end
- end
-
- def initialize(input, output, error, pid)
- @input = input
- @output = output
- @error = error
-
- @pid = pid
- @result = nil
- end
-
- attr :input
- attr :output
- attr :error
- attr :pid
- attr :result
-
- def running?
- return self.class.running?(@pid)
- end
-
- # Close all connections to the child process
- def close
- close_pipes(@input, @output, @error)
- end
-
- # Close input pipe to child process (if applicable)
- def close_input
- @input.close if @input and !@input.closed?
- end
-
- # Send a signal to the child process
- def kill(signal = "INT")
- Process.kill("INT", @pid)
- end
-
- # Wait for the child process to finish, return the exit status.
- def wait
- begin
- close_input
-
- _pid, @result = Process.wait2(@pid)
-
- dump_pipes(@output, @error)
- ensure
- close_pipes(@input, @output, @error)
- end
-
- return @result
- end
-
- # Forcefully stop the child process.
- def stop
- # The process has already been stoped/waited upon
- return if @result
-
- begin
- close_input
- kill
- wait
-
- dump_pipes(@output, @error)
- ensure
- close_pipes(@output, @error)
- end
- end
- end
+ # Very simple method to spawn a child process
+ # <tt>
+ # spawn_child do
+ # puts "Hello from child!"
+ # end
+ # </tt>
+ def self.spawn_child(&block)
+ pid = fork do
+ yield
+
+ exit!(0)
+ end
+
+ return pid
+ end
+
+ # Open a process. Similar to IO.popen, but provides a much more generic interface to stdin, stdout,
+ # stderr and the pid. We also attempt to tidy up as much as possible given some kind of error or
+ # exception. You are expected to write to output, and read from input and error.
+ #
+ # = Options =
+ #
+ # We can specify a pipe that will be redirected to the current processes pipe. A typical one is
+ # :err, so that errors in the child process are printed directly to $stderr of the parent process.
+ # <tt>:passthrough => :err</tt>
+ # <tt>:passthrough => [:in, :out, :err]</tt> or <tt>:passthrough => :all</tt>
+ #
+ # We can specify a set of pipes other than the standard ones for redirecting to other things, eg
+ # <tt>:out => File.open("output.log", "a")</tt>
+ #
+ # If you need to supply a pipe manually, you can do that too:
+ # <tt>:in => IO.pipe</tt>
+ #
+ # You can specify <tt>:daemonize => true</tt> to cause the child process to detatch. In this
+ # case you will generally want to specify files for <tt>:in, :out, :err</tt> e.g.
+ # <tt>
+ # :in => File.open("/dev/null"),
+ # :out => File.open("/var/log/my.log", "a"),
+ # :err => File.open("/var/log/my.err", "a")
+ # </tt>
+ def self.open(command, options = {}, &block)
+ cin, cout, cerr = pipes_for_options(options)
+ spawn = options[:daemonize] ? :spawn_daemon : :spawn_child
+
+ cid = self.send(spawn) do
+ close_pipes(cin[WR], cout[RD], cerr[RD])
+
+ STDIN.reopen(cin[RD]) if cin[RD]
+ STDOUT.reopen(cout[WR]) if cout[WR]
+ STDERR.reopen(cerr[WR]) if cerr[WR]
+
+ if command.respond_to? :call
+ command.call
+ elsif Array === command
+ # If command is a Pathname, we need to convert it to an absolute path if possible,
+ # otherwise if it is relative it might cause problems.
+ if command[0].respond_to? :realpath
+ command[0] = command[0].realpath
+ end
+
+ exec *command
+ else
+ if command.respond_to? :realpath
+ command = command.realpath
+ end
+
+ exec command.to_s
+ end
+ end
+
+ close_pipes(cin[RD], cout[WR], cerr[WR])
+
+ task = Task.new(cin[WR], cout[RD], cerr[RD], cid)
+
+ if block_given?
+ begin
+ yield task
+ task.close_input
+ return task.wait
+ ensure
+ task.close
+ end
+ else
+ return task
+ end
+ end
+
+ def initialize(input, output, error, pid)
+ @input = input
+ @output = output
+ @error = error
+
+ @pid = pid
+
+ @result = nil
+ @status = :running
+ @result_lock = Mutex.new
+ @result_available = ConditionVariable.new
+ end
+
+ attr :input
+ attr :output
+ attr :error
+ attr :pid
+ attr :result
+
+ # Returns true if the current task is still running
+ def running?
+ if self.class.running?(@pid)
+ # The pid still seems alive, check that it isn't some other process using the same pid...
+ @result_lock.synchronize do
+ # If we haven't waited for it yet, it must be either a running process or a zombie...
+ return @status != :stopped
+ end
+ end
+
+ return false
+ end
+
+ # Close all connections to the child process
+ def close
+ begin
+ self.class.dump_pipes(@output, @error)
+ ensure
+ self.class.close_pipes(@input, @output, @error)
+ end
+ end
+
+ # Close input pipe to child process (if applicable)
+ def close_input
+ @input.close if @input and !@input.closed?
+ end
+
+ # Send a signal to the child process
+ def kill(signal = "INT")
+ if running?
+ Process.kill(signal, @pid)
+ else
+ raise Errno::ECHILD
+ end
+ end
+
+ # Wait for the child process to finish, return the exit status.
+ # This function can be called from multiple threads.
+ def wait
+ begin_wait = false
+
+ # Check to see if some other caller is already waiting on the result...
+ @result_lock.synchronize do
+ case @status
+ when :waiting
+ # If so, wait for the wait to finish...
+ @result_available.wait(@result_lock)
+ when :running
+ # Else, mark that we should begin waiting...
+ begin_wait = true
+ @status = :waiting
+ when :stopped
+ return @result
+ end
+ end
+
+ # If we should begin waiting (the first thread to wait)...
+ if begin_wait
+ begin
+ # Wait for the result...
+ _pid, @result = Process.wait2(@pid)
+ end
+
+ # The result is now available...
+ @result_lock.synchronize do
+ @status = :stopped
+ end
+
+ # Notify other threads...
+ @result_available.broadcast()
+ end
+
+ # Return the result
+ return @result
+ end
+
+ # Forcefully stop the child process.
+ def stop
+ if running?
+ close_input
+ kill
+ end
+ end
+ end
end