require 'date' require 'fileutils' require 'yaml' require 'nera_db_folders' require 'nera_simulator_records' require 'nera_parameter_records' require 'nera_run_records' require 'nera_job_records' require 'nera_job_script' module NERA class JobLayerController # instance of NERA::DbFolders @db_folders # instance of NERA::JobRecords @job_records def initialize( path_db_folder) raise ArgumentError unless path_db_folder.is_a?(String) @db_folders = NERA::DbFolders.new( path_db_folder) @job_records = NERA::JobRecords.new( @db_folders.path_to_jobs_table) @job_records.set_yaml_file( @db_folders.path_to_jobs_yaml) end public def path_to_job_layer return @db_folders.path_to_job_layer end def not_finished_list_in_csv list = @job_records.list_all header = @job_records.keys.dup header_ab = "id, state, simulator, param_id, run_id, num_runs, created_at, updated_at, host_name" csv_list = [] list.each do |r_hash| strings = [] header.each do |key| if r_hash[key].is_a?(DateTime) strings << r_hash[key].to_s.split('T')[0] else strings << r_hash[key].to_s end end csv_list << strings.join(", ") end return header_ab, csv_list end def cancel_jobs( job_ids) unless job_ids.is_a?(Array) raise ArgumentError, "job_ids must be an Array." end job_ids.each do |x| raise ArgumentError, "each element of job_ids must be an Integer" unless x.is_a?(Integer) end destroyed_jobids = [] @job_records.transaction { job_ids.each do |jid| d = @job_records.destroy( jid) next unless d sim_recs = NERA::SimulatorRecords.new( @db_folders.path_to_simulators_table) sim_recs.set_yaml_file( @db_folders.path_to_simulators_yaml ) found = sim_recs.list.find do |item| item[:name] == d[:simulator] end sim_id = found[:id] klass = NERA::Simulator.inherited_simulators.find do |sim| sim.to_s == d[:simulator] end raise RuntimeError, "must not happen" unless klass p_tab_path = @db_folders.path_to_parameters_table( klass) param_recs = NERA::ParameterRecords.new( p_tab_path, sim_recs, sim_id) param_recs.set_yaml_file( @db_folders.path_to_parameters_yaml( klass) ) run_records = NERA::RunRecords.new( @db_folders.path_to_runs_table( klass, d[:parameter_id]), param_recs, d[:parameter_id] ) run_records.set_yaml_file( @db_folders.path_to_runs_yaml( klass, d[:parameter_id]) ) run_records.transaction { a = run_records.destroy_job_id( jid) raise RuntimeError, "must not happen" unless a FileUtils.rm( @db_folders.path_to_job_script(jid) ) } destroyed_jobids << jid @db_folders.logger.info(self.class.to_s) { "canceled a job (jobid:#{jid})" } end } destroyed_jobids = nil if destroyed_jobids.size == 0 return destroyed_jobids end def execute_jobs_now( job_ids) unless job_ids.is_a?(Array) raise ArgumentError, "job_ids must be an Array." end job_ids.each do |x| raise ArgumentError, "each element of job_ids must be an Integer" unless x.is_a?(Integer) end submitted_jobs = [] @job_records.transaction { jids = [] job_ids.uniq.each do |jid| jids << jid if @job_records.find_by_id(jid) end return nil if jids.size == 0 jids.each do |jid| js_path = File.expand_path( @db_folders.path_to_job_script(jid) ) FileUtils.chmod(0744, js_path) next if File.directory?( @db_folders.path_to_include_layer+File.basename( js_path).sub(/\.sh$/,'') ) or File.exist?( @db_folders.path_to_include_layer + File.basename( js_path).sub(/\.sh$/,'.tar.bz2')) Dir.chdir( @db_folders.path_to_include_layer) { system "nohup #{js_path} &" submitted_jobs << jid } @job_records.update_to_state_submitted( jid, "localhost") @db_folders.logger.info( self.class.to_s) { "submitted a job (jobid:#{jid}) to localhost" } end } submitted_jobs = nil if submitted_jobs.size == 0 return submitted_jobs end def include_list list = @db_folders.search_include_files list.map! do |path| path.sub( @db_folders.path_to_include_layer, '') end return list end def include( filename) @job_records.transaction { fpath = @db_folders.path_to_include_layer + filename return nil unless File.exist?(fpath) jinfo, rinfo = expand_and_parse( fpath) run_layer_path, run_recs = check_consistency( jinfo, rinfo) run_recs.transaction { folder_path = fpath.sub(/.tar.bz2$/,'/') file_move( folder_path, run_layer_path, jinfo[:run_ids]) update_tables( run_recs, jinfo, rinfo) } remove_files( fpath) @db_folders.logger.info(self.class.to_s) { "included a file (#{filename})" } } return true end private def expand_and_parse( fpath) bz2_path = fpath.sub( /^#{@db_folders.path_to_include_layer}/, '') Dir.chdir( @db_folders.path_to_include_layer) { cmd = "bunzip2 #{bz2_path}" system(cmd) raise "command : #{cmd} failed" unless $? == 0 tarpath = bz2_path.sub(/.bz2$/,'') cmd = "tar xvf #{tarpath}" system(cmd) raise "command : #{cmd} failed" unless $? == 0 stat_path = tarpath.sub(/.tar$/,'/nera_status.rb') js = NERA::JobScript.new(@db_folders) jinfo, rinfo = js.parse_status( stat_path) return jinfo, rinfo } end private def check_consistency( jinfo, rinfo) # record exists? found = @job_records.find_by_id( jinfo[:job_id] ) return nil unless found # check consistency of the simulator conflicting unless jinfo[:simulator] == found[:simulator] sim_records = NERA::SimulatorRecords.new( @db_folders.path_to_simulators_table) sim_records.set_yaml_file( @db_folders.path_to_simulators_yaml ) sim_class = NERA::Simulator.inherited_simulators.find do |sim| sim.to_s == jinfo[:simulator] end conflicting unless jinfo[:simulator] == sim_class.to_s # check consistency of the parameter conflicting unless jinfo[:param_id] == found[:parameter_id] f1 = sim_records.list.find do |item| item[:name] == jinfo[:simulator] end param_records = NERA::ParameterRecords.new( @db_folders.path_to_parameters_table( sim_class) , sim_records, f1[:id] ) param_records.set_yaml_file( @db_folders.path_to_parameters_yaml( sim_class) ) param = param_records.find_by_id( jinfo[:param_id]) jinfo[:parameters].each_pair do |key, value| conflicting unless param[key] == value end # check consistency of the runs conflicting unless jinfo[:run_ids][0] == found[:run_id] conflicting unless jinfo[:run_ids].size == found[:number_of_runs] run_recs = NERA::RunRecords.new( @db_folders.path_to_runs_table( sim_class, jinfo[:param_id]), param_records, jinfo[:param_id]) run_recs.set_yaml_file( @db_folders.path_to_runs_yaml( sim_class, jinfo[:param_id]) ) found_runs = run_recs.list_all_not_finished.find_all do |rec| rec[:job_id] == jinfo[:job_id] and jinfo[:run_ids].include?(rec[:id]) end conflicting unless jinfo[:run_ids].size == found_runs.size # check consistency of run_info raise "Run info is not consistent with job info!" unless jinfo[:run_ids] == rinfo.keys.sort run_layer_path = @db_folders.path_to_run_layer( sim_class, jinfo[:param_id]) return run_layer_path, run_recs end def conflicting raise RuntimeError, "This information is conflicting with the table." end def file_move( folder_path, run_layer_path, run_ids) run_ids.each do |rid| data_folder = folder_path + rid.to_s raise "must not happen" unless FileTest.directory?(data_folder) data_files = Dir.glob(data_folder + '/*') data_files.each do |data_file| dest = run_layer_path + sprintf("%06d_",rid) + File.basename( data_file) raise "file already exists." if File.exists?(dest) FileUtils.mv( data_file, dest, {:verbose => true} ) end end end def update_tables( run_recs, jinfo, rinfos) d = DateTime.now rinfos.each_pair do |rid, info| f = run_recs.update_state_to_finished( rid, info[:start_at], info[:finish_at], d, info[:real_time], info[:user_time], jinfo[:host_name] ) raise "must not happen" unless f end destroyed = @job_records.destroy( jinfo[:job_id]) destroyed[:simulator] = jinfo[:simulator] destroyed[:parameters] = jinfo[:parameters] destroyed[:start_at] = jinfo[:start_at] destroyed[:host_name] = jinfo[:host_name] destroyed[:finish_at] = jinfo[:finish_at] File.open( @db_folders.path_to_finished_jobs_file, "a") do |io| YAML.dump( destroyed, io) io.flush end end def remove_files( fpath) tar_path = fpath.sub(/.bz2$/,'') FileUtils.rm( tar_path) fold_path = tar_path.sub(/.tar$/,'/') FileUtils.rm_r( fold_path) sh_path = @db_folders.path_to_job_layer + File.basename( tar_path.sub(/.tar$/,'.sh')) if FileTest.exist?(sh_path) ff = @db_folders.path_to_job_layer + 'finished_jobs/' FileUtils.mkdir_p( ff) FileUtils.mv(sh_path, ff ) end end public def include_all flag = true include_list.each do |fname| f = include(fname) flag = nil unless f end return flag end end end