#!/usr/bin/env ruby require 'scout' $0 = "scout #{$previous_commands.any? ? $previous_commands*" " + " " : "" }#{ File.basename(__FILE__) }" if $previous_commands ENV["SCOUT_NO_STREAM"] = "true" if ARGV.include? "--nostream" ARGV.delete '--nostream' ENV["SCOUT_UPDATE"] = "true" if ARGV.include? "--update" ARGV.delete '--update' options = SOPT.setup <] -h--help Print this help --nostream Disable job streaming --update Update jobs with newer dependencies --deploy* Deploy mode: serial, local, or SLURM (default 'serial') -jn--jobname* Name to use as job identifier -li--load_inputs* Directory with inputs files to load -pf--printpath Print the file path -prov--provenance Print the step provenance -cl--clean Clean the last step -rcl--recursive_clean Clean all steps -ct--clean_task* Clean a particular task -od--override_deps* Override deps using 'Workflow#task=' array_separated EOF workflow_name, task_name = ARGV raise MissingParameterException.new :workflow if workflow_name.nil? workflow = Workflow.require_workflow workflow_name task_name = task_name.to_sym if task_name task = workflow.tasks[task_name.to_sym] if task_name options[:help] = true if task.nil? help, provenance, clean, recursive_clean, clean_task, load_inputs, jobname, printpath, deploy, override_deps = IndiferentHash.process_options options, :help, :provenance, :clean, :recursive_clean, :clean_task, :load_inputs, :jobname, :printpath, :deploy, :override_deps, :deploy => 'serial' if help if defined? scout_usage scout_usage else puts SOPT.doc end puts workflow.usage(task) if workflow exit 0 end job_inputs = task.get_SOPT if load_inputs job_inputs = job_inputs.merge(workflow.tasks[task_name].load_inputs(load_inputs)) end if override_deps override_deps.split($array_separator || ",").each do |part| t_, value = part.split("=") job_inputs.merge!( t_ => value) end end job = workflow.job(task_name, jobname, job_inputs) job.recursive_clean if recursive_clean job.clean if clean if clean_task ENV["SCOUT_UPDATE"] = 'true' clean_task.split(",").each do |clean_task| if clean_task.include? "#" clean_workflow, clean_task = clean_task.split("#") end job.rec_dependencies.each do |dep| next unless dep.task_name.to_s == clean_task.to_s next unless clean_workflow.nil? || clean_workflow == dep.workflow.to_s dep.clean dep.set_info :status, :cleaned end job.clean if (job.task_name.to_s == clean_task.to_s) || ! job.updated? end end if provenance puts Step.prov_report(job) else case deploy when "serial" job.run(true) when "local" orchestrator = Workflow::Orchestrator.new 3, "cpus" => Misc.processors orchestrator.process({}, job) when "slurm" require 'rbbt-scout' require_relative '../../modules/rbbt-util/lib/rbbt/hpc' slurm_options = {} HPC::BATCH_MODULE = HPC.batch_system "SLURM" HPC::BATCH_MODULE.orchestrate_job(job, slurm_options) job.grace else if deploy.end_with?('-slurm') server = deploy.sub('-slurm','') OffsiteStep.setup(job, server: server, slurm: true) else OffsiteStep.setup(job, server: deploy) end job.run end unless job.done? if printpath job.join path = job.path path = path.find if Path === path puts path else if ! ((c = Open.consume_stream(job.stream, false, STDOUT, false)) && c.end_with?("\n")) puts end end end