require 'date' require 'fileutils' require 'yaml' require 'nera_db_folders' require 'nera_simulator_records' require 'nera_parameter_records' require 'nera_run_records' require 'nera_job_records' require 'nera_job_script' module NERA class JobLayerController # instance of NERA::DbFolders @db_folders # instance of NERA::JobRecords @job_records def initialize( path_db_folder) raise ArgumentError unless path_db_folder.is_a?(String) @db_folders = NERA::DbFolders.new( path_db_folder) @job_records = NERA::JobRecords.new( @db_folders.path_to_jobs_table) end public def path_to_job_layer return @db_folders.path_to_job_layer end def not_finished_list_in_csv list = @job_records.list_all header = @job_records.keys.dup header_ab = "id, state, simulator, param_id, run_id, num_runs, created_at, updated_at, host_name" csv_list = [] list.each do |r_hash| strings = [] header.each do |key| if r_hash[key].is_a?(DateTime) strings << r_hash[key].to_s.split('T')[0] else strings << r_hash[key].to_s end end csv_list << strings.join(", ") end return header_ab, csv_list end def cancel_jobs( job_ids) unless job_ids.is_a?(Array) raise ArgumentError, "job_ids must be an Array." end job_ids.each do |x| raise ArgumentError, "each element of job_ids must be an Integer" unless x.is_a?(Integer) end destroyed_jobids = [] @job_records.transaction { job_ids.each do |jid| d = @job_records.destroy( jid) next unless d sim_recs = NERA::SimulatorRecords.new( @db_folders.path_to_simulators_table) klass = eval( d[:simulator]) raise RuntimeError, "must not happen" unless klass run_records = NERA::RunRecords.new( @db_folders.path_to_runs_table( klass, d[:parameter_id]) ) run_records.transaction { a = run_records.destroy_job_id( jid) raise RuntimeError, "must not happen" unless a FileUtils.rm( @db_folders.path_to_job_script(jid) ) } File.open( @db_folders.path_to_runs_yaml( klass, d[:parameter_id]), 'w') do |io| YAML.dump( run_records.list_all, io) io.flush end destroyed_jobids << jid end } dump_in_yaml destroyed_jobids = nil if destroyed_jobids.size == 0 return destroyed_jobids end def include_list list = @db_folders.search_include_files list.map! do |path| path.sub( @db_folders.path_to_include_layer, '') end return list end def include( filename) @job_records.transaction { fpath = @db_folders.path_to_include_layer + filename return nil unless File.exist?(fpath) jinfo, rinfo = expand_and_parse( fpath) run_layer_path, run_recs, run_yaml_path = check_consistency( jinfo, rinfo) run_recs.transaction { folder_path = fpath.sub(/.tar.bz2$/,'/') file_move( folder_path, run_layer_path, jinfo[:run_ids]) update_tables( run_recs, jinfo, rinfo) } remove_files( fpath) File.open( run_yaml_path, 'w') do |io| YAML.dump( run_recs.list_all, io) io.flush end } dump_in_yaml return true end private def expand_and_parse( fpath) bz2_path = fpath.sub( /^#{@db_folders.path_to_include_layer}/, '') Dir.chdir( @db_folders.path_to_include_layer) { cmd = "bunzip2 #{bz2_path}" system(cmd) raise "command : #{cmd} failed" unless $? == 0 tarpath = bz2_path.sub(/.bz2$/,'') cmd = "tar xvf #{tarpath}" system(cmd) raise "command : #{cmd} failed" unless $? == 0 stat_path = tarpath.sub(/.tar$/,'/nera_status.rb') js = NERA::JobScript.new(@db_folders) jinfo, rinfo = js.parse_status( stat_path) return jinfo, rinfo } end private def check_consistency( jinfo, rinfo) # record exists? found = @job_records.find_by_id( jinfo[:job_id] ) return nil unless found # check consistency of the simulator conflicting unless jinfo[:simulator] == found[:simulator] sim_records = NERA::SimulatorRecords.new( @db_folders.path_to_simulators_table) sim_class = eval( jinfo[:simulator]) conflicting unless jinfo[:simulator] == sim_class.to_s # check consistency of the parameter conflicting unless jinfo[:param_id] == found[:parameter_id] param_records = NERA::ParameterRecords.new( @db_folders.path_to_parameters_table( sim_class) , sim_class) param = param_records.find_by_id( jinfo[:param_id]) jinfo[:parameters].each_pair do |key, value| conflicting unless param[key] == value end # check consistency of the runs conflicting unless jinfo[:run_ids][0] == found[:run_id] conflicting unless jinfo[:run_ids].size == found[:number_of_runs] run_recs = NERA::RunRecords.new( @db_folders.path_to_runs_table( sim_class, jinfo[:param_id]) ) run_yaml_path = @db_folders.path_to_runs_yaml( sim_class, jinfo[:param_id]) found_runs = run_recs.list_all_not_finished.find_all do |rec| rec[:job_id] == jinfo[:job_id] and jinfo[:run_ids].include?(rec[:id]) end conflicting unless jinfo[:run_ids].size == found_runs.size # check consistency of run_info raise "Run info is not consistent with job info!" unless jinfo[:run_ids] == rinfo.keys.sort run_layer_path = @db_folders.path_to_run_layer( sim_class, jinfo[:param_id]) return run_layer_path, run_recs, run_yaml_path end def conflicting raise RuntimeError, "This information is conflicting with the table." end def file_move( folder_path, run_layer_path, run_ids) run_ids.each do |rid| data_folder = folder_path + rid.to_s raise "must not happen" unless FileTest.directory?(data_folder) data_files = Dir.glob(data_folder + '/*') data_files.each do |data_file| dest = run_layer_path + sprintf("%06d_",rid) + File.basename( data_file) raise "file already exists." if File.exists?(dest) FileUtils.mv( data_file, dest, {:verbose => true} ) end end end def update_tables( run_recs, jinfo, rinfos) d = DateTime.now rinfos.each_pair do |rid, info| f = run_recs.update_state_to_finished( rid, info[:start_at], info[:finish_at], d, info[:real_time], info[:user_time], jinfo[:host_name] ) raise "must not happen" unless f end destroyed = @job_records.destroy( jinfo[:job_id]) destroyed[:simulator] = jinfo[:simulator] destroyed[:parameters] = jinfo[:parameters] destroyed[:start_at] = jinfo[:start_at] destroyed[:host_name] = jinfo[:host_name] destroyed[:finish_at] = jinfo[:finish_at] File.open( @db_folders.path_to_finished_jobs_file, "a") do |io| YAML.dump( destroyed, io) io.flush end end def remove_files( fpath) tar_path = fpath.sub(/.bz2$/,'') FileUtils.rm( tar_path) fold_path = tar_path.sub(/.tar$/,'/') FileUtils.rm_r( fold_path) sh_path = @db_folders.path_to_job_layer + File.basename( tar_path.sub(/.tar$/,'.sh')) if FileTest.exist?(sh_path) ff = @db_folders.path_to_job_layer + 'finished_jobs/' FileUtils.mkdir_p( ff) FileUtils.mv(sh_path, ff ) end end public def include_all flag = true include_list.each do |fname| f = include(fname) flag = nil unless f end dump_in_yaml return flag end def dump_in_yaml File.open( @db_folders.path_to_jobs_yaml, 'w') do |io| YAML.dump( @job_records.list_all, io) io.flush end end end end