#!/usr/bin/env ruby require 'rbbt/util/simpleopt' require 'rbbt/workflow' require 'rbbt/workflow/usage' require 'time' def report_options(options) if options.nil? or options.empty? puts "No options" else options.each do |key, value| puts [Log.color(:cyan, key), Misc.fingerprint(value)] * ": " end end end def usage(workflow = nil, task = nil, exception=nil) puts SOPT.doc puts if workflow.nil? puts "No workflow specified. Use `rbbt workflow list` to list available workflows." exit -1 end if task.nil? workflow.load_tasks if workflow.respond_to? :load_tasks workflow.doc puts puts "E.g. rbbt workflow task #{workflow.to_s} #{workflow.tasks.keys.first.to_s} -h" else puts Log.color :magenta, workflow.to_s puts Log.color :magenta, "=" * workflow.to_s.length puts puts workflow.documentation[:description] puts workflow.doc(task) end print_error(exception.message, exception.backtrace) if exception true end def SOPT_options(workflow, task) sopt_options = [] workflow.rec_inputs(task.name).each do |name| short = name.to_s.chars.first boolean = workflow.rec_input_types(task.name)[name].to_sym == :boolean sopt_options << "-#{short}--#{name}#{boolean ? "" : "*"}" end sopt_options * ":" end def get_value_stream(value) if value == "-" io = Misc.open_pipe do |sin| while not STDIN.eof? sin.write STDIN.read(2048) end sin.close end else io = Open.open(value) end class << io attr_accessor :filename end io.filename = value io end def fix_options(workflow, task, job_options) input_types = IndiferentHash.setup workflow.rec_input_types(task.name) input_options = IndiferentHash.setup workflow.rec_input_options(task.name) job_options_cleaned = {} job_options.each do |name, value| value = case input_types[name].to_sym when :boolean TrueClass === value or %w(true TRUE T yes).include? value when :float value.to_f when :integer value.to_i when :text if input_options[name] and input_options[name][:stream] and String === value get_value_stream(value) else case when value == '-' STDIN.read when (String === value and File.exists?(value) and not File.directory?(value)) Open.read(value) else value end end when :array if input_options[name] and input_options[name][:stream] and String === value get_value_stream(value) else if Array === value value else str = case when value == '-' $array_separator ||= "\n" STDIN.read when (String === value and File.exists?(value)) $array_separator ||= "\n" Open.read(value) else value end if $array_separator str.split(/#{$array_separator}/) else str.split(/[,|\s]/) end end end when :tsv if input_options[name] and input_options[name][:stream] and String === value TSV::Parser.new(value == '-' ? STDIN : Open.open(value), :filename => value ) else case value when TSV value when '-' TSV.open(STDIN, :unnamed => true, :sep => $field_separator, :sep2 => $array_separator || "|") else TSV.open(value, :unnamed => true, :sep => $field_separator, :sep2 => $array_separator || "|") end end else value end job_options_cleaned[name] = value end job_options_cleaned end options = SOPT.setup < [] [] Examine workflows and enact tasks from them. If no `task` is specified, a list of available tasks is shown. If a `task` is given it will be enacted with the parameters specified in `options`. Use *-h* option to display the description of a task, including the parameters it accepts; and some examples, if available. Examples can be enacted using `rbbt workflow example [] []`. When a task is enacted a job is instantiated. This job is identified by the `jobname` (which is *Default* unless specified otherwise) and the values of the parameters. If the same taks is enacted using the same `jobname` and parameters, then the same job will be the same. The first time a job is executed it will save the result. Once the job is done you can re-doit using the `clean` parameter. The `recursive_clean` cleans all the job dependencies recursively. -h--help Show this help: -wd--workdir* Change the working directory of the workflow: -as--array_separator* Change the character that separates elements of Arrays, ',', '|', or '\\n' by default: -fs--field_separator* Change the character that separates fields of TSV files '\\t' by default: -jn--jobname* Job name to use. The name 'Default' is used by default: -pn--printname Print the name of the job and exit without starting it: -pf--printpath Print the path of the job result: -cl--clean Clean the last step of the job so that it gets recomputed: -rcl--recursive_clean Clean the last step and its dependencies to recompute the job completely: --fork Run job asyncronously and monitor progress. It monitors detached processes as well: --detach Run job asyncronously and detach process: --exec Run job with no persistence: -O--output* Save job result into file: -jf--job_file* Output one of the job produced files: -ljf--list_job_files List all the files produced in that step: --load_inputs* Load inputs from a directory --info Show the job info: --provenance Report the jobs provenance: EOF workflow = ARGV.shift usage and exit -1 if workflow.nil? task = ARGV.shift # Set log, fork, clean, recursive_clean and help help = !!options.delete(:help) do_fork = !!options.delete(:fork) detach = !!options.delete(:detach) do_exec = !!options.delete(:exec) clean = !!options.delete(:clean) recursive_clean = !!options.delete(:recursive_clean) out = options.include?(:output) ? File.open(options[:output], 'wb') : STDOUT $array_separator = options.delete(:array_separator) $field_separator = options.delete(:field_separator) || "\t" # Get workflow if Rbbt.etc.remote_workflows.exists? remote_workflows = Rbbt.etc.remote_workflows.yaml else remote_workflows = {} end workflow = Workflow.require_workflow workflow # Set task namespace = nil, nil case when task.nil? usage workflow and exit 0 else task_name = task.to_sym begin task = workflow.tasks[task_name] raise Workflow::TaskNotFoundException.new workflow, task_name if task.nil? rescue Workflow::TaskNotFoundException usage workflow puts puts Log.color :magenta, "## Error" puts puts $!.message puts exit 0 end end usage workflow, task and exit 0 if help name = options.delete(:jobname) || "Default" # get job args sopt_option_string = SOPT_options(workflow, task) if options[:load_inputs] task_info = workflow.task_info(task_name) job_options = Workflow.load_inputs(options[:load_inputs], task_info[:inputs], task_info[:input_types]).merge(SOPT.get(sopt_option_string)) else job_options = SOPT.get sopt_option_string end job_options = fix_options(workflow, task, job_options) saved_job_options = job_options workflow.workdir = Path.setup(File.expand_path(options.delete(:workdir))) if options[:workdir] #- get job job = workflow.job(task.name, name, job_options) # clean job if clean job.clean sleep 1 end if recursive_clean job.recursive_clean sleep 1 job = workflow.job(task.name, name, job_options) end require 'pp' # run begin if options[:info] pp job.info exit 0 end if do_exec or (job.respond_to?(:is_exec) and job.is_exec) res = job.exec(true) case when Array === res out.puts res * "\n" when TSV === res out.puts res when Hash === res out.puts res.to_yaml when IO === res while block = res.read(2048) out.write block end else out.puts res end exit 0 end if do_fork if detach job.fork Process.detach job.pid if job.pid puts Log.color(:magenta, "Issued: ") + Log.color(:magenta, job.pid ? job.pid.to_s : 'no pid') + ' -- ' + job.path exit 0 end Signal.trap(:INT) do begin job.abort if job.info[:pid] == job.pid rescue end exit 0 end job.fork else job.run(true) res = job end if options.delete(:provenance) job.join pp job.provenance_paths exit 0 end if options.delete(:printname) job.join if IO === job.result puts job.name exit 0 end if options.delete(:printpath) job.join.load puts job.path exit 0 end if do_fork puts space = 1 Log.tty_size ||= 100 while not job.done? message = (job.messages and job.messages.any?) ? job.messages.last.strip : "no message" status = job.status || "no status" if job.info and job.info.include? :issued issued = job.info[:issued] issued = Time.parse(issued) unless Time === issued time = Time.now - issued end space.times do Log.clear_line end puts "#{Log.color :blue, job.path}" str = "Waiting on #{Log.color :blue, job.info[:pid] || job.pid} (#{time ? time.to_i : '?'} sec. ago) " << [Log.color(:cyan, status.to_s),message.strip].compact*" " puts Misc.format_paragraph str, Log.tty_size space = 2 + Log.uncolor(str).length / Log.tty_size sleep 2 end raise job.messages.last if job.error? if job.info and job.info.include? :issued issued = job.info[:issued] issued = Time.parse(issued) unless Time === issued time = Time.now - issued end space.times do Log.clear_line end out.puts job.path exit 0 end rescue ParameterException SOPT.delete_inputs(workflow.rec_inputs(task.name)) usage(workflow, task, $!) puts Log.color :magenta, "Options:" puts report_options saved_job_options puts exit -1 end if options.delete(:list_job_files) out.puts job.files * "\n" exit 0 end if job_file = options.delete(:job_file) file = job.file(job_file) out.puts Path === file ? file.read : file exit 0 end case res when (defined?(WorkflowRESTClient) and WorkflowRESTClient::RemoteStep) out.puts res.load when Step if IO === res.result begin io = res.result Thread.pass while IO.select([io]).nil? while block = io.read(2048) do out.write block end unless io.closed? io.join if io.respond_to? :join rescue Exception Log.exception $! begin io.abort if io.respond_to? :abort io.join ensure exit -1 end end else res.join out.puts Open.read(res.path) if File.exists? res.path end else out.puts res.to_s end exit 0