require 'rbbt-util' require 'rbbt/util/cmd' module Marenostrum SERVER='mn1' class SBATCH < Exception; attr_accessor :directory def initialize(directory) @directory = directory end end module SLURM def self.template(args, options = {}) development = options.delete :drbbt singularity = options.delete :singularity contain = options.delete :contain sync = options.delete :sync user_group = options.delete :user_group contain_and_sync = options.delete :contain_and_sync wipe_container = options.delete :wipe_container copy_image = options.delete :copy_image exclusive = options.delete :exclusive highmem = options.delete :highmem queue = options.delete(:queue) || 'bsc_ls' task_cpus = options.delete(:task_cpus) || 1 nodes = options.delete(:nodes) || 1 time = options.delete(:time) || "0:00:10" inputs_dir = options.delete :inputs_dir config_keys = options.delete :config_keys user = ENV['USER'] || `whoami`.strip group = File.basename(File.dirname(ENV['HOME'])) if contain_and_sync contain = "/scratch/tmp/rbbt-#{user}" if contain.nil? sync = "~/.rbbt/var/jobs" if sync.nil? wipe_container = "post" if wipe_container.nil? end contain = nil if contain == "" || contain == "none" sync = nil if sync == "" || sync == "none" contain = File.expand_path(contain) if contain name = options[:name] ||= Misc.obj2digest({:options => options.collect{|k,v| [k,v]}.sort_by{|k,v| k.to_s }, :args => args}) options.delete(:name) slurm_basedir = options[:slurm_basedir] ||= File.expand_path(File.join('~/rbbt-slurm', name)) if slurm_basedir.nil? options.delete(:slurm_basedir) rbbt_cmd = args.reject{|e| e == '--' }.collect{|e| e.include?(" ")? '"' + e + '"' : e } * " " rbbt_cmd += " " << options.collect do |o,v| o = o.to_s case v when TrueClass '--' << o when FalseClass '--' << o << "=false" else ['--' << o, "'#{v}'"] * " " end end * " " rbbt_cmd << " --config_keys='#{config_keys}'" if config_keys and not config_keys.empty? time = Misc.format_seconds Misc.timespan(time) unless time.include? ":" #{{{ PREPARE LOCAL LOGFILES Open.mkdir slurm_basedir fout = File.join(slurm_basedir, 'std.out') ferr = File.join(slurm_basedir, 'std.err') fjob = File.join(slurm_basedir, 'job.id') fexit = File.join(slurm_basedir, 'exit.status') fsync = File.join(slurm_basedir, 'sync.log') fcmd = File.join(slurm_basedir, 'command.slurm') #{{{ GENERATE TEMPLATE # HEADER header =<<-EOF #!/bin/bash #SBATCH --qos="#{queue}" #SBATCH --job-name="#{name}" #SBATCH --workdir="#{Dir.pwd}" #SBATCH --output="#{fout}" #SBATCH --error="#{ferr}" #SBATCH --cpus-per-task="#{task_cpus}" #SBATCH --time="#{time}" #SBATCH --nodes="#{nodes}" EOF if highmem header +=<<-EOF #SBATCH --constraint=highmem EOF end if exclusive header +=<<-EOF #SBATCH --exclusive EOF end header +=<<-EOF #CMD: #{rbbt_cmd} EOF # ENV env = "" env +=<<-EOF # Prepare env [[ -f ~/config/load.sh ]] && source ~/config/load.sh module load java # Calculate max available memory let "MAX_MEMORY=$SLURM_MEM_PER_CPU * $SLURM_CPUS_ON_NODE" EOF # RUN run = "" exec_cmd = %(env _JAVA_OPTIONS="-Xms1g -Xmx${MAX_MEMORY}m") if singularity #{{{ SINGULARITY singularity_exec = %(singularity exec -e -B $SINGULARITY_OPT_DIR:/singularity_opt/ -B /apps/) env +=<<-EOF module load intel/2018.1 module load singularity PROJECTS_ROOT="/gpfs/projects/bsc26/" SINGULARITY_IMG="$PROJECTS_ROOT/rbbt.singularity.img" SINGULARITY_OPT_DIR="$PROJECTS_ROOT/singularity_opt/" SINGULARITY_RUBY_INLINE="$HOME/.singularity_ruby_inline" mkdir -p "$SINGULARITY_RUBY_INLINE" EOF prep = "" if contain scratch_group_dir = File.join('/gpfs/scratch/', group) projects_group_dir = File.join('/gpfs/projects/', group) prep +=<<-EOF # Prepare container dir CONTAINER_DIR="#{contain}" mkdir -p $CONTAINER_DIR/.rbbt/etc/ for dir in .ruby_inline git home; do mkdir -p $CONTAINER_DIR/$dir done for tmpd in persist_locks produce_locks R_sockets sensiblewrite sensiblewrite_locks step_info_locks tsv_open_locks; do mkdir -p $CONTAINER_DIR/.rbbt/tmp/$tmpd done # Copy environment cp ~/.rbbt/etc/environment $CONTAINER_DIR/.rbbt/etc/ # Set search_paths echo "singularity: /singularity_opt/{PKGDIR}/{TOPLEVEL}/{SUBPATH}" > $CONTAINER_DIR/.rbbt/etc/search_paths echo "rbbt_user: /home/rbbt/.rbbt/{TOPLEVEL}/{SUBPATH}" >> $CONTAINER_DIR/.rbbt/etc/search_paths echo "home: $CONTAINER_DIR/home/{TOPLEVEL}/{SUBPATH}" >> $CONTAINER_DIR/.rbbt/etc/search_paths echo "group_projects: #{projects_group_dir}/{PKGDIR}/{TOPLEVEL}/{SUBPATH}" >> $CONTAINER_DIR/.rbbt/etc/search_paths echo "group_scratch: #{scratch_group_dir}/{PKGDIR}/{TOPLEVEL}/{SUBPATH}" >> $CONTAINER_DIR/.rbbt/etc/search_paths echo "user_projects: #{projects_group_dir}/#{user}/{PKGDIR}/{TOPLEVEL}/{SUBPATH}" >> $CONTAINER_DIR/.rbbt/etc/search_paths echo "user_scratch: #{scratch_group_dir}/#{user}/{PKGDIR}/{TOPLEVEL}/{SUBPATH}" >> $CONTAINER_DIR/.rbbt/etc/search_paths EOF if user_group && group != user_group prep +=<<-EOF # Add user_group search_path echo "#{user_group}: /gpfs/projects/#{user_group}/{PKGDIR}/{TOPLEVEL}/{SUBPATH}" >> $CONTAINER_DIR/.rbbt/etc/search_paths EOF end if inputs_dir prep +=<<-EOF # Copy inputs [[ -d '#{inputs_dir}' ]] && cp -R '#{inputs_dir}' $CONTAINER_DIR/inputs EOF rbbt_cmd = rbbt_cmd.sub(inputs_dir, "#{contain}/inputs") end if copy_image prep +=<&2 SINGULARITY_IMG="$CONTAINER_DIR/rbbt.singularity.img" EOF end if wipe_container == "pre" || wipe_container == "both" if singularity prep +=<<-EOF # Clean container pre singularity exec -e -C -H "$CONTAINER_DIR" "$SINGULARITY_IMG" rm -Rfv .rbbt/var/jobs &>> #{fsync} singularity exec -e -C -H "$CONTAINER_DIR" "$SINGULARITY_IMG" rbbt system clean -f &>> #{fsync} singularity exec -e -C -H "$CONTAINER_DIR" "$SINGULARITY_IMG" rm -Rfv tmp/ &>> #{fsync} EOF end end end if contain singularity_exec << %( -C -H "$CONTAINER_DIR" \ -B /scratch/tmp \ #{ group != user_group ? "-B /gpfs/projects/#{user_group}" : "" } \ -B #{scratch_group_dir} \ -B #{projects_group_dir} \ -B "$SINGULARITY_RUBY_INLINE":"$CONTAINER_DIR/.ruby_inline":rw \ -B ~/git:"$CONTAINER_DIR/git":ro \ #{Open.exists?('~/.rbbt/software/opt/')? '-B ~/.rbbt/software/opt/:"/opt/":ro' : '' } \ -B ~/.rbbt:"$CONTAINER_DIR/home/":ro \ "$SINGULARITY_IMG") exec_cmd << ' TMPDIR="$CONTAINER_DIR/.rbbt/tmp" ' else singularity_exec += %( -B "$SINGULARITY_RUBY_INLINE":"$HOME/.ruby_inline":rw "$SINGULARITY_IMG" ) end if development exec_cmd += " rbbt --dev='#{development}'" else exec_cmd += ' rbbt' end exec_cmd = singularity_exec + " " + exec_cmd else if development exec_cmd << " " << %(~/git/rbbt-util/bin/rbbt --dev=#{development}) else exec_cmd << " " << 'rbbt' end if contain rbbt_cmd << " " << %(--workdir_all='#{contain}') end end cmd =<<-EOF #{exec_cmd} \\ #{rbbt_cmd} EOF run +=<<-EOF # Run command #{cmd} # Save exit status exit_status=$? EOF # CODA coda = "" if sync if singularity coda +=<<-EOF singularity exec -e -C -H "$CONTAINER_DIR" "$SINGULARITY_IMG" rbbt system clean all -q &>> #{fsync} EOF else coda +=<<-EOF rbbt system clean all -q &>> #{fsync} EOF end if sync.include?("=>") source, _sep, sync = sync.partition("=>") source = source.strip sync = sync.strip source = File.join(File.expand_path(contain), source) else source = File.join(File.expand_path(contain), '.rbbt/var/jobs') end target = File.expand_path(sync) coda +=<<-EOF # Sync data to target location mkdir -p "$(dirname '#{target}')" rsync -avztAXHP --copy-unsafe-links "#{source}/" "#{target}/" &>> #{fsync} sync_es="$?" find '#{target}' -type l -ls | awk '$13 ~ /^#{target.gsub('/','\/')}/ { sub("#{source}", "#{target}", $13); print $11, $13 }' | while read A B; do rm $A; ln -s $B $A; done EOF if contain && (wipe_container == "post" || wipe_container == "both") prep =<<-EOF + prep if ls -A '#{contain}' &> /dev/null ; then echo "ERROR: Container directory not empty, refusing to wipe. #{contain}" &>> #{fsync} fi EOF if singularity coda +=<<-EOF singularity exec -e -C -H "$CONTAINER_DIR" "$SINGULARITY_IMG" rm -v /dev/shm/sem.*.{in,out,process} /dev/shm/sem.Session-PID.*.sem 2> /dev/null >> #{fsync} # Clean container directory #if [ $exit_status == '0' -a $sync_es == '0' ]; then singularity exec -e -C -H "$CONTAINER_DIR" "$SINGULARITY_IMG" rbbt system clean -f &>> #{fsync} singularity exec -e -C -H "$CONTAINER_DIR" "$SINGULARITY_IMG" rm -Rfv .rbbt/var/jobs &>> #{fsync} singularity exec -e -C -H "$CONTAINER_DIR" "$SINGULARITY_IMG" rm -Rfv tmp/ &>> #{fsync} #else # echo "ERROR: Process failed or results could not sync correctly. Contain directory not purged" &>> #{fsync} #fi EOF else coda +=<<-EOF #{exec_cmd} system clean if [ $exit_status == '0' -a $sync_es == '0' ]; then rm -Rfv #{contain} &>> #{fsync} else echo "ERROR: Process failed or results could not sync correctly. Contain directory not purged" &>> #{fsync} fi unset sync_es EOF end end end coda +=<<-EOF # Write exit status to file echo $exit_status > #{fexit} EOF if sync coda +=<<-EOF if [ "$sync_es" == '0' ]; then unset sync_es exit $exit_status else exit $sync_es fi EOF else coda +=<<-EOF exit $exit_status EOF end template = [header, env, prep, run, coda] * "\n" template end def self.issue_template(template, options = {}) slurm_basedir = options[:slurm_basedir] Open.mkdir slurm_basedir dry_run = options.delete :dry_run fout = File.join(slurm_basedir, 'std.out') ferr = File.join(slurm_basedir, 'std.err') fjob = File.join(slurm_basedir, 'job.id') fexit = File.join(slurm_basedir, 'exit.status') fsync = File.join(slurm_basedir, 'sync.log') fcmd = File.join(slurm_basedir, 'command.slurm') job = nil if options[:clean_job] [fcmd, fjob, fout, ferr, fsync, fexit].each do |file| Open.rm file if Open.exists? file end end return if Open.exists?(fexit) STDERR.puts Log.color(:magenta, "Issuing SLURM file: #{fcmd}") STDERR.puts template Open.write(fcmd, template) unless File.exists? fcmd if File.exists?(fjob) job = Open.read(fjob).to_i else if File.exists?(fout) return elsif dry_run STDERR.puts Log.color(:magenta, "To execute run: ") + Log.color(:blue, "sbatch '#{slurm_basedir}/command.slurm'") STDERR.puts Log.color(:magenta, "To monitor progress run (needs local rbbt): ") + Log.color(:blue, "rbbt mn --tail -w '#{slurm_basedir}'") raise Marenostrum::SBATCH, slurm_basedir else Open.rm fsync Open.rm fexit Open.rm fout Open.rm ferr job = CMD.cmd("sbatch '#{fcmd}'").read.scan(/\d+/).first.to_i Open.write(fjob, job.to_s) end end end def self.follow_job(slurm_basedir, tail = true) fjob = File.join(slurm_basedir, 'job.id') fout = File.join(slurm_basedir, 'std.out') ferr = File.join(slurm_basedir, 'std.err') fstatus = File.join(slurm_basedir, 'job.status') job = Open.read(fjob).strip if Open.exists?(fjob) if job status_txt = CMD.cmd("squeue --job #{job}").read STDERR.puts Log.color(:magenta, "Status [#{job.to_i}]:") STDERR.puts status_txt lines = status_txt.split("\n").length end if tail Log.severity = 10 while ! File.exists? fout if job STDERR.puts Log.clear_line(STDERR) STDERR.write Log.color(:magenta, "Waiting for Output") 3.times do STDERR.write Log.color(:magenta, ".") sleep 1 end status_txt = CMD.cmd("squeue --job #{job}").read lines.times do Log.clear_line(STDERR) end Log.clear_line(STDERR) STDERR.puts Log.color(:magenta, "Status [#{job.to_i}]:") STDERR.puts status_txt lines = status_txt.split("\n").length end end STDERR.puts Log.clear_line(STDERR) STDERR.puts Log.color(:magenta, "Output:") begin CMD.cmd("squeue --job #{job} > #{fstatus}") out = CMD.cmd("tail -f '#{fout}'", :pipe => true) if File.exists?(fout) and not tail == :STDERR err = CMD.cmd("tail -f '#{ferr}'", :pipe => true) if File.exists?(ferr) terr = Misc.consume_stream(err, true, STDERR) if err tout = Misc.consume_stream(out, true, STDOUT) if out sleep 3 while CMD.cmd("squeue --job #{job}").read.include? job.to_s rescue Aborted ensure begin terr.exit if terr tout.exit if tout err.close if err err.join if err rescue Exception end begin out.close if out out.join if out rescue Exception end end end end def self.wait_for_job(slurm_basedir, time = 1) fexit = File.join(slurm_basedir, 'exit.status') fjob = File.join(slurm_basedir, 'job.id') job = Open.read(fjob) if Open.exists?(fjob) while ! Open.exists?(fexit) sleep time end end def self.run_job(job, options = {}) options = IndiferentHash.setup(options.dup) dry_run = options.delete :dry_run tail = options.delete :tail workflow = job.workflow task = job.task_name keep_slurm_basedir = options.delete :keep_SLURM_slurm_basedir slurm_basedir = options.delete :SLURM_basedir slurm_basedir = "~/rbbt-slurm" if slurm_basedir.nil? TmpFile.with_file(nil, !keep_slurm_basedir, :tmpdir => slurm_basedir, :prefix => "SLURM_rbbt_job-") do |tmp_directory| options[:slurm_basedir] ||= tmp_directory slurm_basedir = options[:slurm_basedir] inputs_dir = File.join(tmp_directory, 'inputs_dir') saved = Step.save_job_inputs(job, inputs_dir, options) if saved options[:inputs_dir] = inputs_dir cmd = ['workflow', 'task', workflow.to_s, task.to_s, '-pf', '--load_inputs', inputs_dir, '--log', (options[:log] || Log.severity).to_s] else cmd = ['workflow', 'task', workflow.to_s, task.to_s, '-pf', '--log', (options[:log] || Log.severity).to_s] end template = self.template(cmd, options) self.issue_template(template, options.merge(:slurm_basedir => slurm_basedir, :dry_run => dry_run)) return unless tail t_monitor = Thread.new do self.follow_job(slurm_basedir, :STDERR) end self.wait_for_job(slurm_basedir) t_monitor.raise Aborted return unless Open.read(File.join(slurm_basedir, 'exit.status')).strip == '0' path = Open.read(File.join(slurm_basedir, 'std.out')).strip if Open.exists?(path) && job.path != path Log.info "Path of SLURM job #{path} is different from original job #{job.path}. Stablishing link." Open.ln path, job.path Open.ln path + '.info', job.path + '.info' if Open.exists?(path + '.info') Open.ln path + '.files', job.path + '.files' if Open.exists?(path + '.files') end end end end def self.relay(job, options={}) options = Misc.add_defaults options, :target => 'mn1', :search_path => 'user' done_deps = job.dependencies.select do |dep| dep.done? end error_deps = job.dependencies.select do |dep| dep.error? && ! dep.recoverable_error? end (done_deps + error_deps).each do |dep| Step.migrate(dep.path, options[:search_path], options) end end end