#!/usr/bin/env ruby require 'rbbt/util/simpleopt' require 'rbbt/workflow' require 'rbbt/workflow/usage' def load_inputs(dir, task) inputs = {} dir = Path.setup(dir.dup) task.inputs.each do |input| file = dir[input].find Log.debug "Trying #{ input }: #{file}" next unless file.exists? case task.input_types[input] when :tsv, :array, :text Log.debug "Pointing #{ input } to #{file}" inputs[input.to_sym] = file else Log.debug "Loading #{ input } from #{file}" inputs[input.to_sym] = file.read end end inputs end def report_options(options) if options.nil? or options.empty? puts "No options" else options.each do |key, value| puts [Log.color(:cyan, key), Misc.fingerprint(value)] * ": " end end end def usage(workflow = nil, task = nil, exception=nil) puts SOPT.doc puts Log.color :magenta, "## WORKFLOW" puts if workflow.nil? puts "No workflow specified" exit -1 end if task.nil? workflow.load_tasks if workflow.respond_to? :load_tasks workflow.doc else puts Log.color :magenta, workflow.to_s puts Log.color :magenta, "=" * workflow.to_s.length puts puts workflow.workflow_description puts workflow.doc(task) end print_error(exception.message, exception.backtrace) if exception true end def SOPT_options(workflow, task) sopt_options = [] workflow.rec_inputs(task.name).each do |name| short = name.to_s.chars.first boolean = workflow.rec_input_types(task.name)[name].to_sym == :boolean sopt_options << "-#{short}--#{name}#{boolean ? "" : "*"}" end sopt_options * ":" end def fix_options(workflow, task, job_options) option_types = workflow.rec_input_types(task.name) job_options_cleaned = {} job_options.each do |name, value| value = case option_types[name].to_sym when :boolean TrueClass === true or %w(true TRUE T yes).include? value when :float value.to_f when :integer value.to_i when :text case when value == '-' STDIN.read when (String === value and File.exists?(value) and not File.directory?(value)) Open.read(value) else value end when :array if Array === value value else str = case when value == '-' STDIN.read when (String === value and File.exists?(value)) Open.read(value) else value end if $array_separator str.split(/#{$array_separator}/) else str.split(/[,|\s]/) end end when :tsv case value when TSV value when '-' TSV.open(STDIN, :unnamed => true) else TSV.open(value, :unnamed => true) end else value end job_options_cleaned[name] = value end job_options_cleaned end options = SOPT.get <<EOF -h--help Show this help: -wd--workdir* Change the working directory of the workflow: -as--array_separator* Change the character that separates elements of Arrays, ',', '|', or '\\n' by default: -jn--jobname* Job name to use. The name 'Default' is used by default: -pn--printname Print the name of the job and exit without starting it: -cl--clean Clean the last step of the job so that it gets recomputed: -rcl--recursive_clean Clean the last step and its dependencies to recompute the job completely: --fork Run job asyncronously: --exec Run job with no persistence: -O--output* Save job result into file: -jf--job_file* Output one of the job produced files: -ljf--list_job_files List all the files produced in that step: --load_inputs* Load inputs from a directory --info Show the job info: --provenance Report the jobs provenance: EOF workflow = ARGV.shift usage and exit -1 if workflow.nil? task = ARGV.shift # Set log, fork, clean, recursive_clean and help help = !!options.delete(:help) do_fork = !!options.delete(:fork) do_exec = !!options.delete(:exec) clean = !!options.delete(:clean) recursive_clean = !!options.delete(:recursive_clean) out = options.include?(:output) ? File.open(options[:output], 'wb') : STDOUT $array_separator = options.delete(:array_separator) # Get workflow if Rbbt.etc.remote_workflows.exists? remote_workflows = Rbbt.etc.remote_workflows.yaml else remote_workflows = {} end if remote_workflows.include? workflow require 'rbbt/rest/client' workflow = WorkflowRESTClient.new remote_workflows[workflow], workflow else Workflow.require_workflow workflow workflow = Workflow.workflows.select{|mod| Misc.snake_case(mod.to_s) == Misc.snake_case(workflow)}.first workflow = Workflow.workflows.last if workflow.nil? end # Set task namespace = nil, nil case when task.nil? usage workflow and exit 0 # Can not remember what this was #when (task =~ /\./) # namespace, task = options.delete(:task).split('.') # namespace = Misc.string2const(namespace) else task_name = task.to_sym task = workflow.tasks[task_name] raise ParameterException, "Task not found: #{ task_name }" if task.nil? end usage workflow, task and exit 0 if help name = options.delete(:jobname) || "Default" # get job args sopt_option_string = SOPT_options(workflow, task) if options[:load_inputs] job_options = load_inputs(options[:load_inputs], task) else job_options = SOPT.get sopt_option_string end job_options = fix_options(workflow, task, job_options) saved_job_options = job_options workflow.workdir = Path.setup(File.expand_path(options.delete(:workdir))) if options[:workdir] #- get job job = workflow.job(task.name, name, job_options) # clean job if clean job.clean sleep 1 job = workflow.job(task.name, name, job_options) end if recursive_clean job.recursive_clean sleep 1 job = workflow.job(task.name, name, job_options) end require 'pp' # run begin if options[:info] pp job.info exit 0 end if do_exec res = job.exec case when Array === res out.puts res * "\n" when TSV === res out.puts res when Hash === res out.puts res.to_yaml else out.puts res end exit 0 end if do_fork job.fork puts while not job.done? message = job.messages ? job.messages.last : "no message" status = job.status puts("No status yet") and next if status.nil? puts Log.return_line + "Waiting on #{job.pid} " + [Log.color(:magenta, status.to_s),message]*" " + " " * 100 sleep 2 end Signal.trap(:INT){ job.abort } raise job.messages.last if job.error? res = job.load else res = job.run(true) end if options.delete(:provenance) pp job.provenance exit 0 end if options.delete(:printname) puts job.name exit 0 else Log.low "Job name: #{job.name}" end rescue ParameterException SOPT.delete_inputs(workflow.rec_inputs(task.name)) usage(workflow, task, $!) puts Log.color :magenta, "Options:" puts report_options saved_job_options puts exit -1 end if options.delete(:list_job_files) out.puts job.files * "\n" exit 0 end if job_file = options.delete(:job_file) file = job.file(job_file) out.puts Path === file ? file.read : file exit 0 end if Step === res out.puts Open.read(res.path) if File.exists? res.path else out.puts res end exit 0