#!/usr/bin/env ruby require 'rbbt/util/simpleopt' require 'rbbt/workflow' require 'rbbt/workflow/usage' require 'time' def report_options(options) if options.nil? or options.empty? puts "No options" else options.each do |key, value| puts [Log.color(:cyan, key), Misc.fingerprint(value)] * ": " end end end def usage(workflow = nil, task = nil, exception=nil) puts SOPT.doc puts if workflow.nil? puts "No workflow specified. Use `rbbt workflow list` to list available workflows." exit -1 end if task.nil? workflow.load_tasks if workflow.respond_to? :load_tasks workflow.doc puts puts "E.g. rbbt workflow task #{workflow.to_s} #{workflow.tasks.keys.first.to_s} -h" else puts Log.color :magenta, workflow.to_s puts Log.color :magenta, "=" * workflow.to_s.length if workflow.documentation[:description] and not workflow.documentation[:description].empty? puts puts workflow.documentation[:description] end puts workflow.doc(task) end print_error(exception.message, exception.backtrace) if exception true end def SOPT_options(workflow, task) sopt_options = [] workflow.rec_inputs(task.name).each do |name| short = name.to_s.chars.first boolean = workflow.rec_input_types(task.name)[name].to_sym == :boolean sopt_options << "-#{short}--#{name}#{boolean ? "" : "*"}" end sopt_options * ":" end def get_value_stream(value) if value == "-" io = Misc.open_pipe do |sin| while not STDIN.eof? sin.write STDIN.read(2048) end sin.close end else io = Open.open(value) end class << io attr_accessor :filename end io.filename = value io end def fix_options(workflow, task, job_options) input_types = IndiferentHash.setup workflow.rec_input_types(task.name) input_options = IndiferentHash.setup workflow.rec_input_options(task.name) job_options_cleaned = {} job_options.each do |name, value| value = case input_types[name].to_sym when :boolean TrueClass === value or %w(true TRUE T yes).include? value when :float value.to_f when :integer value.to_i when :text if input_options[name] and input_options[name][:stream] and String === value get_value_stream(value) else case when value == '-' STDIN.read when (String === value and File.exist?(value) and not File.directory?(value)) Open.read(value) else value end end when :array if input_options[name] and input_options[name][:stream] and String === value and Misc.is_filename? value get_value_stream(value) else if Array === value value else array_separator = $array_separator str = case when value == '-' array_separator ||= "\n" STDIN.read when (String === value and File.exist?(value)) array_separator ||= "\n" Open.read(value) else value end if array_separator str.split(/#{array_separator}/) else str.split(/[,|\s]/) end end end when :tsv if input_options[name] and input_options[name][:stream] and String === value TSV::Parser.new(value == '-' ? STDIN : Open.open(value), :filename => value ) else case value when TSV value when '-' TSV.open(STDIN, :unnamed => true, :sep => $field_separator, :sep2 => ($array_separator || "|")) when (Misc.is_filename?(value) and String) TSV.open(value, :unnamed => true, :sep => $field_separator, :sep2 => ($array_separator || "|")) else TSV.open(StringIO.new(value), :unnamed => true, :sep => $field_separator, :sep2 => ($array_separator || "|")) end end else value end job_options_cleaned[name] = value end job_options_cleaned end options = SOPT.setup < [] [] Examine workflows and enact tasks from them. If no `task` is specified, a list of available tasks is shown. If a `task` is given it will be enacted with the parameters specified in `options`. Use *-h* option to display the description of a task, including the parameters it accepts; and some examples, if available. Examples can be enacted using `rbbt workflow example [] []`. When a task is enacted a job is instantiated. This job is identified by the `jobname` (which is *Default* unless specified otherwise) and the values of the parameters. If the same taks is enacted using the same `jobname` and parameters, then the same job will be the same. The first time a job is executed it will save the result. Once the job is done you can re-doit using the `clean` parameter. The `recursive_clean` cleans all the job dependencies recursively. -h--help Show this help -wd--workdir* Change the working directory of the workflow -as--array_separator* Change the character that separates elements of Arrays, ',', '|', or '\\n' by default -fs--field_separator* Change the character that separates fields of TSV files '\\t' by default -jn--jobname* Job name to use. The name 'Default' is used by default -pn--printname Print the name of the job and exit without starting it -pf--printpath Print the path of the job result -cl--clean Clean the last step of the job so that it gets recomputed -rcl--recursive_clean Clean the last step and its dependencies to recompute the job completely --fork Run job asyncronously and monitor progress. It monitors detached processes as well --detach Run job asyncronously and detach process --exec Run job with no persistence -O--output* Save job result into file -jf--job_file* Output one of the job produced files -ljf--list_job_files List all the files produced in that step --load_inputs* Load inputs from a directory --info Show the job info --provenance Report the jobs provenance -W--workflows* Load a list of workflows -R--requires* Require a list of files -rwt--remote_workflow_tasks* Load a yaml file describing remote workflow tasks EOF workflow = ARGV.shift usage and exit -1 if workflow.nil? task = ARGV.shift # Set log, fork, clean, recursive_clean and help help = !!options.delete(:help) do_fork = !!options.delete(:fork) detach = !!options.delete(:detach) do_exec = !!options.delete(:exec) clean = !!options.delete(:clean) recursive_clean = !!options.delete(:recursive_clean) out = options.include?(:output) ? File.open(options[:output], 'wb') : STDOUT $array_separator = options.delete(:array_separator) $field_separator = options.delete(:field_separator) || "\t" # Get workflow if Rbbt.etc.remote_workflows.exists? remote_workflows = Rbbt.etc.remote_workflows.yaml else remote_workflows = {} end workflow = Workflow.require_workflow workflow if options[:workflows] require 'rbbt/workflow' workflows = options[:workflows].split(',') workflows.each do |workflow| workflow.strip! Workflow.require_workflow workflow end end if options[:requires] requires = options[:requires].split(',') requires.each do |req| req.strip! require req end end if options[:remote_workflow_tasks] Workflow.load_remote_tasks(options[:remote_workflow_tasks]) end # Set task namespace = nil, nil case when task.nil? usage workflow and exit 0 else task_name = task.to_sym begin task = workflow.tasks[task_name] raise Workflow::TaskNotFoundException.new workflow, task_name if task.nil? rescue Workflow::TaskNotFoundException usage workflow puts puts Log.color :magenta, "## Error" puts puts $!.message puts exit 0 end end usage workflow, task and exit 0 if help name = options.delete(:jobname) || "Default" # get job args sopt_option_string = SOPT_options(workflow, task) if options[:load_inputs] task_info = workflow.task_info(task_name) job_options = Workflow.load_inputs(options[:load_inputs], task_info[:inputs], task_info[:input_types]).merge(SOPT.get(sopt_option_string)) else job_options = SOPT.get sopt_option_string end job_options = fix_options(workflow, task, job_options) saved_job_options = job_options workflow.workdir = Path.setup(File.expand_path(options.delete(:workdir))) if options[:workdir] #- get job job = workflow.job(task.name, name, job_options) # clean job if clean job.clean sleep 1 end if recursive_clean job.recursive_clean end require 'pp' # run begin if options[:info] pp job.info exit 0 end if do_exec or (job.respond_to?(:is_exec) and job.is_exec) res = job.exec(:stream) case when res.respond_to?(:gets) while block = res.read(2048) out.write block end when Array === res out.puts res * "\n" when TSV === res out.puts res when Hash === res out.puts res.to_yaml when IO === res while block = res.read(2048) out.write block end else out.puts res end exit 0 end if do_fork ENV["RBBT_NO_PROGRESS"] = "true" if detach job.fork Process.detach job.pid if job.pid puts Log.color(:magenta, "Issued: ") + Log.color(:magenta, job.pid ? job.pid.to_s : 'no pid') + ' -- ' + job.path exit 0 end job.fork else job.run(:stream) res = job end if options.delete(:provenance) job.join pp job.provenance_paths exit 0 end if options.delete(:printname) job.join if IO === job.result puts job.name exit 0 end if options.delete(:printpath) job.join raise job.messages.last if job.error? if Open.remote? job.path puts job.url + Log.color(:blue, "?_format=raw") else puts job.path end exit 0 end if do_fork puts space = 1 Log.tty_size ||= 100 while not job.done? message = (job.messages and job.messages.any?) ? job.messages.last.strip : "no message" status = job.status || "no status" if job.info and job.info.include? :issued issued = job.info[:issued] issued = Time.parse(issued) unless Time === issued time = Time.now - issued end space.times do Log.clear_line end puts "#{Log.color :blue, job.path}" str = "Waiting on #{Log.color :blue, job.info[:pid] || job.pid} (#{time ? time.to_i : '?'} sec. ago) " << [Log.color(:cyan, status.to_s),message.strip].compact*" " puts Misc.format_paragraph str, Log.tty_size space = 2 + Log.uncolor(str).length / Log.tty_size sleep 2 end raise job.messages.last if job.error? if job.info and job.info.include? :issued issued = job.info[:issued] issued = Time.parse(issued) unless Time === issued time = Time.now - issued end space.times do Log.clear_line end if Open.remote? job.path out.puts job.path + Log.color(:blue, "?_format=raw") else out.puts job.path end exit 0 end rescue ParameterException SOPT.delete_inputs(workflow.rec_inputs(task.name)) usage(workflow, task, $!) puts Log.color :magenta, "Options:" puts report_options saved_job_options puts exit -1 end if options.delete(:list_job_files) out.puts job.files * "\n" exit 0 end if job_file = options.delete(:job_file) job.join file = job.file(job_file) out.puts Path === file ? file.read : file exit 0 end case res when (defined?(WorkflowRESTClient) and WorkflowRESTClient::RemoteStep) res = job.result if res.respond_to? :gets begin Misc.consume_stream(res, false, out) rescue EOFError, IOError end res.join if res.respond_to? :join elsif res.nil? job.join puts Open.read(job.path, :nocache => true) else puts res.to_s end when Step if res.streaming? io = TSV.get_stream res Misc.consume_stream(io, false, out) io.join if io.respond_to? :join elsif IO === res.result begin io = res.get_stream Misc.consume_stream(io, false, out) io.join if io.respond_to? :join rescue Aborted, Interrupt Log.error "Process interrupted. Aborting step" res.abort begin io.abort if io.respond_to? :abort io.join if io.respond_to? :join ensure exit -1 end rescue Exception Log.exception $! res.abort begin io.abort if io.respond_to? :abort io.join if io.respond_to? :join ensure exit -1 end end else res.join out.puts Open.read(res.path) if File.exist? res.path end else out.puts res.to_s end exit 0