lib/runnable.rb in runnable-0.2.4 vs lib/runnable.rb in runnable-0.3.0

- old
+ new

@@ -22,214 +22,206 @@ # Convert a executable command in a Ruby-like class # you are able to start, define params and send signals (like kill, or stop) # # @example Usage: -# class LS < Runnable +# class LS +# include Runnable +# +# executes :ls # command_style :extended # end # # ls = LS.new # ls.alh # ls.run -class Runnable +module Runnable + def self.included(klass) + klass.extend ClassMethods + end + + module ClassMethods + # Define the command to be executed + # @return [nil] + # @param [Symbol] command Command to be executed + def executes( cmd ) + define_method( :command ) { cmd } + end + + # Define the parameter style to be used. + # @return [nil] + def command_style( style ) + define_method( :command_style ) { style } + end + + # Create a user definde command + # @return [nil] + # @param [Symbol] name The user defined command name + # @param [Hash] options Options. + # @option options :blocking (false) Describe if the execution is blocking or non-blocking + # @option options :log_path (false) Path used to store logs # (dont store logs if no path specified) + def define_command( name, opts = {}, &block ) + blocking = opts[:blocking] || false + log_path = opts[:log_path] || false + + commands[name] = { :blocking => blocking } + + define_method( name ) do |*args| + run name, block.call(*args), log_path + join if blocking + end + end + + # Generic command processor. It allows to define generic processors used in all the + # user defined commands + # @param [Hash] opts Processing options + # @option opts :outputs (nil) Output processing Hash (regexp => output) + # @option opts :exceptions (nil) Exceptions processing Hash (regexp => exception) + def processors( opts = nil ) + if opts.is_a? Hash + @processors = opts + else + @processors ||= Hash.new + end + end + + # Method missing processing for the command processors + def method_missing( name, *opts ) + raise NoMethodError.new( name.to_s ) unless name.to_s =~ /([a-z]*)_([a-z]*)/ + + # command_processors + if $2 == "processors" + commands[$1.to_sym][:outputs] = opts.first[:outputs] + commands[$1.to_sym][:exceptions] = opts.first[:exceptions] + end + end + + # @group Accessors for the module class variables + + # Returns the user defined commands + # @return [Hash] commands User defined commands + def commands + @commands ||= Hash.new + end + + # Returns the list of runnable instances by pid + # @return [Hash] list of runnable instances by pid + def processes + @processes ||= Hash.new + end + + # Processes writer + def processes=( value ) + @processes = value + end + end + # Process id. attr_reader :pid # Process owner. attr_reader :owner # Process group. attr_reader :group # Directory where process was called from. attr_reader :pwd - # Input file - attr_accessor :input + # Process output + attr_reader :output - # Set the output file - attr_accessor :output + # Process options + attr_accessor :options + # Process log output + attr_accessor :log_path # Metaprogramming part of the class - - # Define the parameter style to be used. - # @return [nil] - def self.command_style( style ) - define_method( :command_style ) do - style - end - end - + # Parameter style used for the command. # @return [Symbol] Command style. def command_style :gnu end - # List of runnable instances running on the system order by pid. - @@processes = Hash.new + # Default command to be executed + # @return [String] Command to be executed + def command + self.class.to_s.split( "::" ).last.downcase + end # Constant to calculate cpu usage. HERTZ = 100 - # Create a new instance of a runnable command. - # @param [Hash] option_hash Options. - # @option option_hash :delete_log (true) Delete the log after execution. - # @option option_hash :command_options ("") Command options. - # @option option_hash :log_path ("/var/log/runnable") Path for the log files. - def initialize( option_hash = {} ) - # keys :delete_log - # :command_options - # :log_path - - # If we have the command class in a namespace, we need to remove - # the namespace name - @command = self.class.to_s.split( "::" ).last.downcase - - # Set the default command option - # Empty by default - option_hash[:command_options] ||= "" - @options = option_hash[:command_options] - - # Set the log path - # Default path is "/var/log/runnable" - option_hash[:log_path] ||= "/var/log/runnable/" - @log_path = option_hash[:log_path] - - # Set the delete_log option - # true by default - if option_hash[:delete_log] == nil - @delete_log = true - else - @delete_log = option_hash[:delete_log] - end - - # Store input options - @input = String.new - - # Store output options - @output = String.new - - # Standar Outputs - @std_output = { - :out => "", - :err => "" - } - - # @todo: checks that command is in the PATH - # ... - - # we dont set the pid, because we dont know until run - @pid = nil - @excep_array = [] - - - # Metaprogramming part - # Create a new instance of the parser class - @command_line_interface = Object.const_get( command_style.to_s.capitalize.to_sym ).new - # End Metaprogramming part - - #End of initialize instance variables - - create_log_directory - end - # Start the execution of the command. # @return [nil] - def run + def run(name = nil, opts = nil, log_path = nil) + return false if @pid # Create a new mutex @pid_mutex = Mutex.new + + # Log path should be an instance variable to avoid a mess + @log_path = log_path || @log_path # Create pipes to redirect Standar I/O out_rd, out_wr = IO.pipe # Redirect Error I/O err_rd, err_wr = IO.pipe - + # Reset exceptions array to not store exceptions for # past executions - @excep_array = [] + command_argument = opts ? opts.split(" ") : compose_command - # Set up the command line - command = [] - #command << @command - command << @input.to_s - command << @options.to_s - command << @command_line_interface.parse - command << @output.to_s - #command = command.join( " " ) - command.flatten! - - command = command.select do |value| - !value.to_s.strip.empty? - end -=begin - # Debugging purpose - puts "I: #{@input}" - puts "OP: #{@options}" - puts "CLI: #{@command_line_interface.parse}" - puts "O: #{@output}" - puts "C: #{command}" -=end - #@pid = Process.spawn( command, { :out => out_wr, :err => err_wr } ) - @pid = Process.spawn( @command, *command, { :out => out_wr, :err => err_wr } ) + @pid = Process.spawn( command.to_s, *command_argument, { :out => out_wr, :err => err_wr } ) # Include instance in class variable - @@processes[@pid] = self + self.class.processes[@pid] = self # Prepare the process info file to be read file_status = File.open( "/proc/#{@pid}/status" ).read.split( "\n" ) # Owner: Read the owner of the process from /proc/@pid/status @owner = file_status[6].split( " " )[1] # Group: Read the Group owner from /proc/@pid/status @group = file_status[7].split( " " )[1] - + # Set @output_thread with new threads # wich execute the input/ouput loop - create_logs(:out => [out_wr, out_rd], :err => [err_wr, err_rd]) - - # Create a new thread to avoid blocked processes - @run_thread = Thread.new do - # Wait to get the pid process even if it has finished - Process.wait( @pid, Process::WUNTRACED ) + stream_info = { + :out => [out_wr, out_rd], + :err => [err_wr, err_rd] + } - # Wait each I/O thread - @output_threads.each { |thread| thread.join } - # Delete log if its necesary - delete_log - - # Get the exit code from command - exit_status = $?.exitstatus - - # In case of error add an Exception to the @excep_array - @excep_array << SystemCallError.new( exit_status ) if exit_status != 0 - - # Call methods according to the exit code - if @excep_array.empty? - finish - else - failed( @excep_array ) - end - - # This instance is finished and we remove it - @@processes.delete( @pid ) + if name + cmd_info = self.class.commands[name] + stream_processors = { + :outputs => cmd_info[:outputs], + :exceptions => cmd_info[:exceptions] + } end - + + output_threads = process_streams( stream_info, stream_processors ) + + # Create a new thread to avoid blocked processes + @run_thread = threaded_process(@pid, output_threads) + # Satuts Variables # PWD: Current Working Directory get by /proc/@pid/cwd # @rescue If a fast process is runned there isn't time to get # the correct PWD. If the readlink fails, we retry, if the process still alive # until the process finish. + begin - @pwd = File.readlink( "/proc/#{@pid}/cwd" ) - rescue + @pwd ||= File.readlink( "/proc/#{@pid}/cwd" ) + rescue Errno::ENOENT # If cwd is not available rerun @run_thread if @run_thread.alive? #If it is alive, we retry to get cwd @run_thread.run retry else #If process has terminated, we set pwd to current working directory of ruby @pwd = Dir.getwd end + rescue #Errno::EACCESS + @pwd = Dir.getwd end end # Stop the command. # @return [nil] @@ -255,10 +247,11 @@ # Wait for command thread to finish it execution. # @return [nil] def join @run_thread.join if @run_thread.alive? + @output unless @output.empty? end # Check if prcess is running on the system. # @return [Bool] True if process is running, false if it is not. def running? @@ -266,19 +259,31 @@ end # Standar output of command # @return [String] Standar output def std_out - @std_output[:out] + @std_out ||= "" end # Standar error output of the command # @return [String] Standar error output def std_err - @std_output[:err] + @std_err ||= "" end + # Sets the command input to be passed to the command execution + # @param [String] opt Command input + def input=( opt ) + @command_input = opt + end + + # Sets the command output to be passed to the command execution + # @param [String] opt Command output + def output=( opt ) + @command_output = opt + end + # Calculate the estimated memory usage in Kb. # @return [Number] Estimated mem usage in Kb. def mem File.open( "/proc/#{@pid}/status" ).read.split( "\n" )[11].split( " " )[1].to_i end @@ -313,11 +318,10 @@ 0 rescue ZeroDivisionError # Seconds is Zero! 0 end - end # Estimated bandwidth in kb/s. # @param [String] iface Interface to be scaned. # @param [Number] sample_time Time passed between samples in seconds. @@ -352,64 +356,32 @@ # @param [Symbol] method Method called that is missing # @param [Array] params Params in the call # @param [Block] block Block code in method # @return [nil] def method_missing( method, *params, &block ) + @command_line_interface ||= Object.const_get( command_style.to_s.capitalize.to_sym ).new + if params.length > 1 super( method, params, block ) else if params[0].class == Hash # If only one param is passed and its a Hash # we need to expand the hash and call each key as a method with value as params # @see parse_hash for more information parse_hash( params[0] ) else - @command_line_interface.add_param( method.to_s, - params != nil ? params.join(",") : nil ) + @command_line_interface.add_param( method.to_s, params != nil ? params.join(",") : nil ) end end end # List of runnable instances running on the system. # @return [Hash] Using process pids as keys and instances as values. def self.processes @@processes end - # @abstract - # Returns a hash of regular expressions and exceptions associated to them. - # Command output is match against those regular expressions, if it does match - # an appropiate exception is included in the return value of execution. - # @note This method should be overwritten in child classes. - # @example Usage: - # class ls < Runnable - # def exceptions - # { /ls: (invalid option.*)/ => ArgumentError } - # end - # end - # - # @return [Hash] Using regular expressions as keys and exceptions that should - # be raised as values. - def exceptions - {} - end - - # @abstract - # Method called when command ends with no erros. - # This method is a hook so it should be overwritten in child classes. - # @return [nil] - def finish - end - - # @abstract - # Method called when command executions fail. - # This method is a hook so it should be overwritten in child classes. - # @param [Array] exceptions Array containing exceptions raised during the command execution. - # @return [nil] - def failed( exceptions ) - end - # Send the desired signal to the command. # @param [Symbol] Signal to be send to the command. # @todo raise ESRCH if pid is not in system # or EPERM if pid is not from user. def send_signal( signal ) @@ -435,63 +407,109 @@ # As we kill child processes, main process may have exit already end end protected - # Redirect command I/O to log files. + # Process the command I/O. # These files are located in /var/log/runnable. # @param [Hash] Outputs options. # @option outputs stream [Symbol] Stream name. # @option outputs pipes [IO] I/O stream to be redirected. - # @return [nil] - def create_logs( outputs = {} ) - # Create an empty file for logging - FileUtils.touch "#{@log_path}#{@command}_#{@pid}.log" + # @return [Array] output_threads Array containing the output processing threads + def process_streams( output_streams = {}, stream_processors = nil ) + @output = Hash.new + @std_output = Hash.new - @output_threads = [] + output_threads = [] # for each io stream we create a thread wich read that # stream and write it in a log file - outputs.each do |output_name, pipes| - @output_threads << Thread.new do - pipes[0].close - - @std_output[output_name] = "" - - pipes[1].each_line do |line| - @std_output[output_name] << line - - File.open("#{@log_path}#{@command}_#{@pid}.log", "a") do |log_file| - log_file.puts( "[#{Time.new.inspect} || [STD#{output_name.to_s.upcase} || [#{@pid}]] #{line}" ) - end - # Match custom exceptions - # if we get a positive match, add it to the exception array - # in order to inform the user of what had happen - exceptions.each do | reg_expr, value | - @excep_array<< value.new( $1 ) if reg_expr =~ line - end - end - end + output_streams.collect do |output_name, pipes| + threaded_output_processor(output_name, pipes, stream_processors) end end - def create_log_directory - Dir.mkdir( @log_path ) unless Dir.exist?( @log_path ) - end - - def delete_log - File.delete( "#{@log_path}#{@command}_#{@pid}.log" ) if @delete_log == true - end - # Expand a parameter hash calling each key as method and value as param # forcing method misssing to be called. # @param [Hash] hash Parameters to be expand and included in command execution # @return [nil] def parse_hash( hash ) hash.each do |key, value| # Add the param parsed to command_line_interface - @command_line_interface.add_param( - key.to_s, - value != nil ? value.to_s : nil - ) + @command_line_interface.add_param( key.to_s, value != nil ? value.to_s : nil ) end end + + private + + def save_log(output_name, line) + Dir.mkdir( @log_path ) unless Dir.exist?( @log_path ) + + File.open("#{@log_path}/#{self.command}_#{@pid}.log", "a") do |log_file| + log_file.puts( "[#{Time.new.inspect} || [STD#{output_name.to_s.upcase} || [#{@pid}]] #{line}" ) + end + end + + def compose_command + @command_line_interface ||= Object.const_get( command_style.to_s.capitalize.to_sym ).new + + [ @command_input.to_s, + @options.to_s, + @command_line_interface.parse, + @command_output.to_s + ].select do |value| + !value.to_s.strip.empty? + end.flatten.select{|x| !x.empty?} + end + + def threaded_process(pid, output_threads) + Thread.new do + # Wait to get the pid process even if it has finished + Process.wait( pid, Process::WUNTRACED ) + + # Wait each I/O thread + output_threads.each { |thread| thread.join } + + # Get the exit code from command + exit_status = $?.exitstatus + + # This instance is finished and we remove it + self.class.processes.delete( pid ) + @pid = nil + + # In case of error add an Exception to the @excep_array + raise SystemCallError.new( exit_status ) if exit_status != 0 + end + end + + def threaded_output_processor(output_name, pipes, stream_processors) + exception_processors = stream_processors.is_a?(Hash) ? stream_processors[:exceptions] : {} + exception_processors.merge!(self.class.processors[:exceptions] || {}) + + output_processors = stream_processors.is_a?(Hash) ? stream_processors[:outputs] : {} + output_processors.merge!(self.class.processors[:output] || {}) + + Thread.new do + pipes[0].close + + pipes[1].each_line do |line| + ( output_name == :err ? self.std_err : self.std_out ) << line + + save_log(output_name, line) if @log_path + + # Match custom exceptions + # if we get a positive match, raise the exception + exception_processors.each do | reg_expr, value | + raise value.new( line ) if reg_expr =~ line + end + + # Match custom outputs + # if we get a positive match, add it to the outputs array + output_processors.each do | reg_expr, value | + @output[value] ||= Array.new + @output[value] << $1 if reg_expr =~ line + end + + end + end + end + end