#!/usr/bin/env ruby require 'rbbt/util/simpleopt' require 'rbbt/workflow' require 'rbbt/workflow/usage' require 'time' def report_options(options) if options.nil? or options.empty? puts "No options" else options.each do |key, value| puts [Log.color(:cyan, key), Misc.fingerprint(value)] * ": " end end end def usage(workflow = nil, task = nil, exception=nil, abridge = false) puts SOPT.doc puts if workflow.nil? puts "No workflow specified. Use `rbbt workflow list` to list available workflows." exit -1 end if task.nil? workflow.load_tasks if workflow.respond_to? :load_tasks workflow.doc nil, abridge puts puts "E.g. rbbt workflow task #{workflow.to_s} #{workflow.tasks.keys.first.to_s} -h" else puts Log.color :magenta, workflow.to_s puts Log.color :magenta, "=" * workflow.to_s.length if workflow.documentation[:title] and not workflow.documentation[:title].empty? puts puts Misc.format_paragraph workflow.documentation[:title] end if workflow.documentation[:description] and not workflow.documentation[:description].empty? puts puts Misc.format_paragraph workflow.documentation[:description] end puts workflow.doc(task, abridge) end print_error(exception.message, exception.backtrace) if exception true end def get_value_stream(value) if value == "-" io = Misc.open_pipe do |sin| while not STDIN.eof? sin.write STDIN.read(2048) end sin.close end else io = Open.open(value) end class << io attr_accessor :filename end io.filename = value io end def fix_options(workflow, task, job_options) input_types = IndiferentHash.setup workflow.rec_input_types(task.name) input_options = IndiferentHash.setup workflow.rec_input_options(task.name) job_options_cleaned = {} job_options.each do |name, value| type = input_types[name] type = type.to_sym if type value = case type when nil value when :boolean TrueClass === value or %w(true TRUE T yes).include? value when :float value.to_f when :integer value.to_i when :text if input_options[name] and input_options[name][:stream] and String === value get_value_stream(value) else case when value == '-' STDIN.read when (String === value and File.exist?(value) and not File.directory?(value)) Open.read(value) else value end end when :array if input_options[name] && input_options[name][:stream] && String === value && Misc.is_filename?(value) && !! input_options[name][:nofile] get_value_stream(value) elsif input_options[name] and input_options[name][:stream] and value == "-" STDIN else if Array === value value else array_separator = $array_separator str = case when value == '-' array_separator ||= "\n" STDIN.read when (String === value and File.exist?(value) and not (input_options[name] && input_options[name][:nofile])) array_separator ||= "\n" Open.read(value) else value end if array_separator str.split(/#{array_separator}/) else str.split(/[,|\s]/) end end end when :tsv if input_options[name] and input_options[name][:stream] and String === value TSV::Parser.new(value == '-' ? STDIN : Open.open(value), :filename => value ) else case value when TSV value when '-' TSV.open(STDIN, :unnamed => true, :sep => $field_separator, :sep2 => ($array_separator || "|")) when (Misc.is_filename?(value) and String) TSV.open(value, :unnamed => true, :sep => $field_separator, :sep2 => ($array_separator || "|")) else TSV.open(StringIO.new(value), :unnamed => true, :sep => $field_separator, :sep2 => ($array_separator || "|")) end end when :directory Path.setup(File.expand_path(value)) else value end job_options_cleaned[name] = value end job_options_cleaned end options = SOPT.setup < [] [] Examine workflows and enact tasks from them. If no `task` is specified, a list of available tasks is shown. If a `task` is given it will be enacted with the parameters specified in `options`. Use *-h* option to display the description of a task, including the parameters it accepts; and some examples, if available. Examples can be enacted using `rbbt workflow example [] []`. When a task is enacted a job is instantiated. This job is identified by the `jobname` (which is *Default* unless specified otherwise) and the values of the parameters; these two things determine the filename under which the job result will be saved. If the taks is enacted using the same `jobname` and parameters it will result in the same job, pointing to the same result file. The first time a job is executed it will save the result. The saved result will be returned directly if the same task is re-enacted. Once the job is done you can redo it using the `clean` parameter, this cleans the last step of the task. The `recursive_clean` cleans all the job dependency steps recursively. -h--help Show this help -ha--abridge Abridge help -wd--workdir* Change the working directory of the workflow -wda--workdir_all* Change the working directory of ALL workflow -as--array_separator* Change the character that separates elements of Arrays, ',', '|', or '\\n' by default -fs--field_separator* Change the character that separates fields of TSV files '\\t' by default -jn--jobname* Job name to use. The name 'Default' is used by default -pn--printname Print the name of the job and exit without starting it -pf--printpath Print the path of the job result -cl--clean Clean the last step of the job so that it gets recomputed -ct--clean_task* Clean a particular dependency task -rcl--recursive_clean Clean the last step and its dependencies to recompute the job completely -uaj--update_all_jobs Consider all dependencies when checking for updates, even when they have no info files --fork Run job asyncronously and monitor progress. It monitors detached processes as well --detach Run job asyncronously and detach process --exec Run job with no persistence -O--output* Save job result into file -jf--job_file* Output one of the job produced files -ljf--list_job_files List all the files produced in that step --load_inputs* Load inputs from a directory --info Show the job info -prov--provenance Report the jobs provenance -W--workflows* Load a list of workflows -R--requires* Require a list of files -pre--prepare* Prepare dependencies -prec--prepare_cpus* Number of dependencies prepared in parallel -rwt--remote_workflow_tasks* Load a yaml file describing remote workflow tasks -od--override_deps* Override deps using 'Workflow#task=' array_separated EOF workflow = ARGV.shift usage and exit -1 if workflow.nil? task = ARGV.shift # Set log, fork, clean, recursive_clean and help help = !!options.delete(:help) do_fork = !!options.delete(:fork) detach = !!options.delete(:detach) do_exec = !!options.delete(:exec) clean = !!options.delete(:clean) clean_task = options.delete(:clean_task) override_deps = options.delete(:override_deps) recursive_clean = !!options.delete(:recursive_clean) out = options.include?(:output) ? File.open(options[:output], 'wb') : STDOUT $array_separator = options.delete(:array_separator) $field_separator = options.delete(:field_separator) || "\t" # Get workflow if Rbbt.etc.remote_workflows.exists? remote_workflows = Rbbt.etc.remote_workflows.yaml else remote_workflows = {} end Workflow.workdir = Path.setup(File.expand_path(options.delete(:workdir_all))) if options[:workdir_all] workflow = Workflow.require_workflow workflow if clean_task ENV["RBBT_UPDATE"] = 'true' end if options[:update_all_jobs] ENV["RBBT_UPDATE_ALL_JOBS"] = 'true' ENV["RBBT_UPDATE"] = 'true' end if options[:workflows] require 'rbbt/workflow' workflows = options[:workflows].split(',') workflows.each do |workflow| workflow.strip! Workflow.require_workflow workflow end end if options[:requires] requires = options[:requires].split(',') requires.each do |req| req.strip! require req end end if options[:remote_workflow_tasks] Workflow.load_remote_tasks(options[:remote_workflow_tasks]) end # Set task namespace = nil, nil case when task.nil? usage workflow, nil, nil, options[:abridge] and exit 0 else task_name = task.to_sym begin task = workflow.tasks[task_name] raise Workflow::TaskNotFoundException.new workflow, task_name if task.nil? rescue Workflow::TaskNotFoundException usage workflow, nil, nil, options[:abridge] puts puts Log.color :magenta, "## Error" puts puts $!.message puts exit 0 end end usage workflow, task, nil, options[:abridge] and exit 0 if help name = options.delete(:jobname) # get job args job_options = workflow.get_SOPT(task) if options[:load_inputs] task_info = workflow.task_info(task_name) job_options = Workflow.load_inputs(options[:load_inputs], task_info[:inputs], task_info[:input_types]).merge(job_options) end job_options = fix_options(workflow, task, job_options) saved_job_options = job_options workflow.workdir = Path.setup(File.expand_path(options.delete(:workdir))) if options[:workdir] if override_deps override_deps.split($array_separator || ",").each do |part| t_, value = part.split("=") job_options.merge!( t_ => value) end end #- get job job = workflow.job(task.name, name, job_options) $job = job # clean job if clean job.clean sleep 1 end if clean_task clean_task.split(",").each do |clean_task| if clean_task.include? "#" clean_workflow, clean_task = clean_task.split("#") end job.rec_dependencies.each do |dep| next unless dep.task_name.to_s == clean_task.to_s next unless clean_workflow.nil? || clean_workflow == dep.workflow.to_s dep.clean dep.set_info :status, :cleaned end end end if recursive_clean job.recursive_clean end require 'pp' # run begin if options[:info] pp job.info exit 0 end if options.delete(:printname) puts job.name exit 0 end if do_exec or (job.respond_to?(:is_exec) and job.is_exec) res = job.exec(:stream) result_type = job.result_type res = JSON.parse(res.read) if (defined?(RemoteStep) and RemoteStep === job) && %w(array float integer boolean).include?(result_type.to_s) case when res.respond_to?(:gets) begin Misc.consume_stream(res, false, out) rescue EOFError, IOError end res.join if res.respond_to? :join when Array === res out.puts res * "\n" when TSV === res out.puts res when Hash === res out.puts res.to_yaml when IO === res while block = res.read(2048) out.write block end else out.puts res end exit 0 end if options.delete(:provenance) if options.delete(:printpath) puts job.path else puts Step.prov_report(job) end exit 0 end if tasks = options.delete(:prepare) tasks = tasks.split(",") prepare_cpus = (options[:prepare_cpus] || 1) puts Step.prepare_dependencies(job, tasks, prepare_cpus) exit 0 end if do_fork ENV["RBBT_NO_PROGRESS"] = "true" if detach job.fork Process.detach job.pid if job.pid puts Log.color(:magenta, "Issued: ") + Log.color(:magenta, job.pid ? job.pid.to_s : 'no pid') + ' -- ' + job.path exit 0 end job.fork else job.run(:stream) res = job end if options.delete(:printpath) job.join raise job.messages.last if (job.error? || job.aborted?) && job.messages if Open.remote? job.path puts job.url + Log.color(:blue, "?_format=raw") else puts job.path end exit 0 end if do_fork puts space = 1 Log.tty_size ||= 100 while not job.done? message = (job.messages and job.messages.any?) ? job.messages.last.strip : "no message" status = job.status || "no status" if job.info and job.info.include? :issued issued = job.info[:issued] issued = Time.parse(issued) unless Time === issued time = Time.now - issued end space.times do Log.clear_line end puts "#{Log.color :blue, job.path}" str = "Waiting on #{Log.color :blue, job.info[:pid] || job.pid} (#{time ? time.to_i : '?'} sec. ago) " << [Log.color(:cyan, status.to_s),message.strip].compact*" " puts Misc.format_paragraph str, Log.tty_size space = 2 + Log.uncolor(str).length / Log.tty_size sleep 2 end raise job.messages.last if job.error? if job.info and job.info.include? :issued issued = job.info[:issued] issued = Time.parse(issued) unless Time === issued time = Time.now - issued end space.times do Log.clear_line end if Open.remote?(job.path) out.puts job.path + Log.color(:blue, "?_format=raw") else out.puts job.path end exit 0 end rescue ParameterException SOPT.delete_inputs(workflow.rec_inputs(task.name)) usage(workflow, task, $!) puts Log.color :magenta, "Options:" puts report_options saved_job_options puts exit -1 end if options.delete(:list_job_files) out.puts job.files * "\n" exit 0 end if job_file = options.delete(:job_file) job.join file = job.file(job_file) out.puts Path === file ? file.read : file exit 0 end case res when (defined?(WorkflowRemoteClient) and WorkflowRemoteClient::RemoteStep) res = job.result if res.respond_to? :gets begin Misc.consume_stream(res, false, out) rescue EOFError, IOError end res.join if res.respond_to? :join elsif res.nil? job.join raise job.get_exception if job.error? || job.aborted? puts Open.read(job.path, :nocache => true, :nofail => true) else if Array === res out.puts res * "\n" else out.puts res.to_s end end when Step if res.streaming? io = TSV.get_stream res Misc.consume_stream(io, false, out) io.join if io.respond_to? :join elsif IO === res.result begin io = res.get_stream Misc.consume_stream(io, false, out) io.join if io.respond_to? :join rescue Aborted, Interrupt Log.error "Process interrupted. Aborting step" res.abort begin io.abort if io.respond_to? :abort io.join if io.respond_to? :join ensure exit -1 end rescue Exception Log.exception $! res.abort begin io.abort if io.respond_to? :abort io.join if io.respond_to? :join ensure exit -1 end end else res.join out.puts Open.read(res.path) if Open.exist?(res.path) || Open.remote?(res.path) || Open.ssh?(res.path) end else if Array === res out.puts res * "\n" else out.puts res.to_s end end exit 0