lib/runnable.rb in runnable-0.2.4 vs lib/runnable.rb in runnable-0.3.0
- old
+ new
@@ -22,214 +22,206 @@
# Convert a executable command in a Ruby-like class
# you are able to start, define params and send signals (like kill, or stop)
#
# @example Usage:
-# class LS < Runnable
+# class LS
+# include Runnable
+#
+# executes :ls
# command_style :extended
# end
#
# ls = LS.new
# ls.alh
# ls.run
-class Runnable
+module Runnable
+ def self.included(klass)
+ klass.extend ClassMethods
+ end
+
+ module ClassMethods
+ # Define the command to be executed
+ # @return [nil]
+ # @param [Symbol] command Command to be executed
+ def executes( cmd )
+ define_method( :command ) { cmd }
+ end
+
+ # Define the parameter style to be used.
+ # @return [nil]
+ def command_style( style )
+ define_method( :command_style ) { style }
+ end
+
+ # Create a user definde command
+ # @return [nil]
+ # @param [Symbol] name The user defined command name
+ # @param [Hash] options Options.
+ # @option options :blocking (false) Describe if the execution is blocking or non-blocking
+ # @option options :log_path (false) Path used to store logs # (dont store logs if no path specified)
+ def define_command( name, opts = {}, &block )
+ blocking = opts[:blocking] || false
+ log_path = opts[:log_path] || false
+
+ commands[name] = { :blocking => blocking }
+
+ define_method( name ) do |*args|
+ run name, block.call(*args), log_path
+ join if blocking
+ end
+ end
+
+ # Generic command processor. It allows to define generic processors used in all the
+ # user defined commands
+ # @param [Hash] opts Processing options
+ # @option opts :outputs (nil) Output processing Hash (regexp => output)
+ # @option opts :exceptions (nil) Exceptions processing Hash (regexp => exception)
+ def processors( opts = nil )
+ if opts.is_a? Hash
+ @processors = opts
+ else
+ @processors ||= Hash.new
+ end
+ end
+
+ # Method missing processing for the command processors
+ def method_missing( name, *opts )
+ raise NoMethodError.new( name.to_s ) unless name.to_s =~ /([a-z]*)_([a-z]*)/
+
+ # command_processors
+ if $2 == "processors"
+ commands[$1.to_sym][:outputs] = opts.first[:outputs]
+ commands[$1.to_sym][:exceptions] = opts.first[:exceptions]
+ end
+ end
+
+ # @group Accessors for the module class variables
+
+ # Returns the user defined commands
+ # @return [Hash] commands User defined commands
+ def commands
+ @commands ||= Hash.new
+ end
+
+ # Returns the list of runnable instances by pid
+ # @return [Hash] list of runnable instances by pid
+ def processes
+ @processes ||= Hash.new
+ end
+
+ # Processes writer
+ def processes=( value )
+ @processes = value
+ end
+ end
+
# Process id.
attr_reader :pid
# Process owner.
attr_reader :owner
# Process group.
attr_reader :group
# Directory where process was called from.
attr_reader :pwd
- # Input file
- attr_accessor :input
+ # Process output
+ attr_reader :output
- # Set the output file
- attr_accessor :output
+ # Process options
+ attr_accessor :options
+ # Process log output
+ attr_accessor :log_path
# Metaprogramming part of the class
-
- # Define the parameter style to be used.
- # @return [nil]
- def self.command_style( style )
- define_method( :command_style ) do
- style
- end
- end
-
+
# Parameter style used for the command.
# @return [Symbol] Command style.
def command_style
:gnu
end
- # List of runnable instances running on the system order by pid.
- @@processes = Hash.new
+ # Default command to be executed
+ # @return [String] Command to be executed
+ def command
+ self.class.to_s.split( "::" ).last.downcase
+ end
# Constant to calculate cpu usage.
HERTZ = 100
- # Create a new instance of a runnable command.
- # @param [Hash] option_hash Options.
- # @option option_hash :delete_log (true) Delete the log after execution.
- # @option option_hash :command_options ("") Command options.
- # @option option_hash :log_path ("/var/log/runnable") Path for the log files.
- def initialize( option_hash = {} )
- # keys :delete_log
- # :command_options
- # :log_path
-
- # If we have the command class in a namespace, we need to remove
- # the namespace name
- @command = self.class.to_s.split( "::" ).last.downcase
-
- # Set the default command option
- # Empty by default
- option_hash[:command_options] ||= ""
- @options = option_hash[:command_options]
-
- # Set the log path
- # Default path is "/var/log/runnable"
- option_hash[:log_path] ||= "/var/log/runnable/"
- @log_path = option_hash[:log_path]
-
- # Set the delete_log option
- # true by default
- if option_hash[:delete_log] == nil
- @delete_log = true
- else
- @delete_log = option_hash[:delete_log]
- end
-
- # Store input options
- @input = String.new
-
- # Store output options
- @output = String.new
-
- # Standar Outputs
- @std_output = {
- :out => "",
- :err => ""
- }
-
- # @todo: checks that command is in the PATH
- # ...
-
- # we dont set the pid, because we dont know until run
- @pid = nil
- @excep_array = []
-
-
- # Metaprogramming part
- # Create a new instance of the parser class
- @command_line_interface = Object.const_get( command_style.to_s.capitalize.to_sym ).new
- # End Metaprogramming part
-
- #End of initialize instance variables
-
- create_log_directory
- end
-
# Start the execution of the command.
# @return [nil]
- def run
+ def run(name = nil, opts = nil, log_path = nil)
+ return false if @pid
# Create a new mutex
@pid_mutex = Mutex.new
+
+ # Log path should be an instance variable to avoid a mess
+ @log_path = log_path || @log_path
# Create pipes to redirect Standar I/O
out_rd, out_wr = IO.pipe
# Redirect Error I/O
err_rd, err_wr = IO.pipe
-
+
# Reset exceptions array to not store exceptions for
# past executions
- @excep_array = []
+ command_argument = opts ? opts.split(" ") : compose_command
- # Set up the command line
- command = []
- #command << @command
- command << @input.to_s
- command << @options.to_s
- command << @command_line_interface.parse
- command << @output.to_s
- #command = command.join( " " )
- command.flatten!
-
- command = command.select do |value|
- !value.to_s.strip.empty?
- end
-=begin
- # Debugging purpose
- puts "I: #{@input}"
- puts "OP: #{@options}"
- puts "CLI: #{@command_line_interface.parse}"
- puts "O: #{@output}"
- puts "C: #{command}"
-=end
- #@pid = Process.spawn( command, { :out => out_wr, :err => err_wr } )
- @pid = Process.spawn( @command, *command, { :out => out_wr, :err => err_wr } )
+ @pid = Process.spawn( command.to_s, *command_argument, { :out => out_wr, :err => err_wr } )
# Include instance in class variable
- @@processes[@pid] = self
+ self.class.processes[@pid] = self
# Prepare the process info file to be read
file_status = File.open( "/proc/#{@pid}/status" ).read.split( "\n" )
# Owner: Read the owner of the process from /proc/@pid/status
@owner = file_status[6].split( " " )[1]
# Group: Read the Group owner from /proc/@pid/status
@group = file_status[7].split( " " )[1]
-
+
# Set @output_thread with new threads
# wich execute the input/ouput loop
- create_logs(:out => [out_wr, out_rd], :err => [err_wr, err_rd])
-
- # Create a new thread to avoid blocked processes
- @run_thread = Thread.new do
- # Wait to get the pid process even if it has finished
- Process.wait( @pid, Process::WUNTRACED )
+ stream_info = {
+ :out => [out_wr, out_rd],
+ :err => [err_wr, err_rd]
+ }
- # Wait each I/O thread
- @output_threads.each { |thread| thread.join }
- # Delete log if its necesary
- delete_log
-
- # Get the exit code from command
- exit_status = $?.exitstatus
-
- # In case of error add an Exception to the @excep_array
- @excep_array << SystemCallError.new( exit_status ) if exit_status != 0
-
- # Call methods according to the exit code
- if @excep_array.empty?
- finish
- else
- failed( @excep_array )
- end
-
- # This instance is finished and we remove it
- @@processes.delete( @pid )
+ if name
+ cmd_info = self.class.commands[name]
+ stream_processors = {
+ :outputs => cmd_info[:outputs],
+ :exceptions => cmd_info[:exceptions]
+ }
end
-
+
+ output_threads = process_streams( stream_info, stream_processors )
+
+ # Create a new thread to avoid blocked processes
+ @run_thread = threaded_process(@pid, output_threads)
+
# Satuts Variables
# PWD: Current Working Directory get by /proc/@pid/cwd
# @rescue If a fast process is runned there isn't time to get
# the correct PWD. If the readlink fails, we retry, if the process still alive
# until the process finish.
+
begin
- @pwd = File.readlink( "/proc/#{@pid}/cwd" )
- rescue
+ @pwd ||= File.readlink( "/proc/#{@pid}/cwd" )
+ rescue Errno::ENOENT
# If cwd is not available rerun @run_thread
if @run_thread.alive?
#If it is alive, we retry to get cwd
@run_thread.run
retry
else
#If process has terminated, we set pwd to current working directory of ruby
@pwd = Dir.getwd
end
+ rescue #Errno::EACCESS
+ @pwd = Dir.getwd
end
end
# Stop the command.
# @return [nil]
@@ -255,10 +247,11 @@
# Wait for command thread to finish it execution.
# @return [nil]
def join
@run_thread.join if @run_thread.alive?
+ @output unless @output.empty?
end
# Check if prcess is running on the system.
# @return [Bool] True if process is running, false if it is not.
def running?
@@ -266,19 +259,31 @@
end
# Standar output of command
# @return [String] Standar output
def std_out
- @std_output[:out]
+ @std_out ||= ""
end
# Standar error output of the command
# @return [String] Standar error output
def std_err
- @std_output[:err]
+ @std_err ||= ""
end
+ # Sets the command input to be passed to the command execution
+ # @param [String] opt Command input
+ def input=( opt )
+ @command_input = opt
+ end
+
+ # Sets the command output to be passed to the command execution
+ # @param [String] opt Command output
+ def output=( opt )
+ @command_output = opt
+ end
+
# Calculate the estimated memory usage in Kb.
# @return [Number] Estimated mem usage in Kb.
def mem
File.open( "/proc/#{@pid}/status" ).read.split( "\n" )[11].split( " " )[1].to_i
end
@@ -313,11 +318,10 @@
0
rescue ZeroDivisionError
# Seconds is Zero!
0
end
-
end
# Estimated bandwidth in kb/s.
# @param [String] iface Interface to be scaned.
# @param [Number] sample_time Time passed between samples in seconds.
@@ -352,64 +356,32 @@
# @param [Symbol] method Method called that is missing
# @param [Array] params Params in the call
# @param [Block] block Block code in method
# @return [nil]
def method_missing( method, *params, &block )
+ @command_line_interface ||= Object.const_get( command_style.to_s.capitalize.to_sym ).new
+
if params.length > 1
super( method, params, block )
else
if params[0].class == Hash
# If only one param is passed and its a Hash
# we need to expand the hash and call each key as a method with value as params
# @see parse_hash for more information
parse_hash( params[0] )
else
- @command_line_interface.add_param( method.to_s,
- params != nil ? params.join(",") : nil )
+ @command_line_interface.add_param( method.to_s, params != nil ? params.join(",") : nil )
end
end
end
# List of runnable instances running on the system.
# @return [Hash] Using process pids as keys and instances as values.
def self.processes
@@processes
end
- # @abstract
- # Returns a hash of regular expressions and exceptions associated to them.
- # Command output is match against those regular expressions, if it does match
- # an appropiate exception is included in the return value of execution.
- # @note This method should be overwritten in child classes.
- # @example Usage:
- # class ls < Runnable
- # def exceptions
- # { /ls: (invalid option.*)/ => ArgumentError }
- # end
- # end
- #
- # @return [Hash] Using regular expressions as keys and exceptions that should
- # be raised as values.
- def exceptions
- {}
- end
-
- # @abstract
- # Method called when command ends with no erros.
- # This method is a hook so it should be overwritten in child classes.
- # @return [nil]
- def finish
- end
-
- # @abstract
- # Method called when command executions fail.
- # This method is a hook so it should be overwritten in child classes.
- # @param [Array] exceptions Array containing exceptions raised during the command execution.
- # @return [nil]
- def failed( exceptions )
- end
-
# Send the desired signal to the command.
# @param [Symbol] Signal to be send to the command.
# @todo raise ESRCH if pid is not in system
# or EPERM if pid is not from user.
def send_signal( signal )
@@ -435,63 +407,109 @@
# As we kill child processes, main process may have exit already
end
end
protected
- # Redirect command I/O to log files.
+ # Process the command I/O.
# These files are located in /var/log/runnable.
# @param [Hash] Outputs options.
# @option outputs stream [Symbol] Stream name.
# @option outputs pipes [IO] I/O stream to be redirected.
- # @return [nil]
- def create_logs( outputs = {} )
- # Create an empty file for logging
- FileUtils.touch "#{@log_path}#{@command}_#{@pid}.log"
+ # @return [Array] output_threads Array containing the output processing threads
+ def process_streams( output_streams = {}, stream_processors = nil )
+ @output = Hash.new
+ @std_output = Hash.new
- @output_threads = []
+ output_threads = []
# for each io stream we create a thread wich read that
# stream and write it in a log file
- outputs.each do |output_name, pipes|
- @output_threads << Thread.new do
- pipes[0].close
-
- @std_output[output_name] = ""
-
- pipes[1].each_line do |line|
- @std_output[output_name] << line
-
- File.open("#{@log_path}#{@command}_#{@pid}.log", "a") do |log_file|
- log_file.puts( "[#{Time.new.inspect} || [STD#{output_name.to_s.upcase} || [#{@pid}]] #{line}" )
- end
- # Match custom exceptions
- # if we get a positive match, add it to the exception array
- # in order to inform the user of what had happen
- exceptions.each do | reg_expr, value |
- @excep_array<< value.new( $1 ) if reg_expr =~ line
- end
- end
- end
+ output_streams.collect do |output_name, pipes|
+ threaded_output_processor(output_name, pipes, stream_processors)
end
end
- def create_log_directory
- Dir.mkdir( @log_path ) unless Dir.exist?( @log_path )
- end
-
- def delete_log
- File.delete( "#{@log_path}#{@command}_#{@pid}.log" ) if @delete_log == true
- end
-
# Expand a parameter hash calling each key as method and value as param
# forcing method misssing to be called.
# @param [Hash] hash Parameters to be expand and included in command execution
# @return [nil]
def parse_hash( hash )
hash.each do |key, value|
# Add the param parsed to command_line_interface
- @command_line_interface.add_param(
- key.to_s,
- value != nil ? value.to_s : nil
- )
+ @command_line_interface.add_param( key.to_s, value != nil ? value.to_s : nil )
end
end
+
+ private
+
+ def save_log(output_name, line)
+ Dir.mkdir( @log_path ) unless Dir.exist?( @log_path )
+
+ File.open("#{@log_path}/#{self.command}_#{@pid}.log", "a") do |log_file|
+ log_file.puts( "[#{Time.new.inspect} || [STD#{output_name.to_s.upcase} || [#{@pid}]] #{line}" )
+ end
+ end
+
+ def compose_command
+ @command_line_interface ||= Object.const_get( command_style.to_s.capitalize.to_sym ).new
+
+ [ @command_input.to_s,
+ @options.to_s,
+ @command_line_interface.parse,
+ @command_output.to_s
+ ].select do |value|
+ !value.to_s.strip.empty?
+ end.flatten.select{|x| !x.empty?}
+ end
+
+ def threaded_process(pid, output_threads)
+ Thread.new do
+ # Wait to get the pid process even if it has finished
+ Process.wait( pid, Process::WUNTRACED )
+
+ # Wait each I/O thread
+ output_threads.each { |thread| thread.join }
+
+ # Get the exit code from command
+ exit_status = $?.exitstatus
+
+ # This instance is finished and we remove it
+ self.class.processes.delete( pid )
+ @pid = nil
+
+ # In case of error add an Exception to the @excep_array
+ raise SystemCallError.new( exit_status ) if exit_status != 0
+ end
+ end
+
+ def threaded_output_processor(output_name, pipes, stream_processors)
+ exception_processors = stream_processors.is_a?(Hash) ? stream_processors[:exceptions] : {}
+ exception_processors.merge!(self.class.processors[:exceptions] || {})
+
+ output_processors = stream_processors.is_a?(Hash) ? stream_processors[:outputs] : {}
+ output_processors.merge!(self.class.processors[:output] || {})
+
+ Thread.new do
+ pipes[0].close
+
+ pipes[1].each_line do |line|
+ ( output_name == :err ? self.std_err : self.std_out ) << line
+
+ save_log(output_name, line) if @log_path
+
+ # Match custom exceptions
+ # if we get a positive match, raise the exception
+ exception_processors.each do | reg_expr, value |
+ raise value.new( line ) if reg_expr =~ line
+ end
+
+ # Match custom outputs
+ # if we get a positive match, add it to the outputs array
+ output_processors.each do | reg_expr, value |
+ @output[value] ||= Array.new
+ @output[value] << $1 if reg_expr =~ line
+ end
+
+ end
+ end
+ end
+
end