lib/rbbt/util/workflow.rb in rbbt-util-2.1.0 vs lib/rbbt/util/workflow.rb in rbbt-util-3.0.2

- old
+ new

@@ -1,90 +1,114 @@ -require 'rake' -require 'rbbt/util/rake' - +require 'rbbt/util/resource' +require 'rbbt/util/task' +require 'rbbt/util/persistence' module WorkFlow - module Runner - def data - $_workflow_prereq + def self.extended(base) + class << base + attr_accessor :tasks, :jobdir, :dangling_options, :dangling_option_descriptions, + :dangling_option_types, :dangling_option_defaults, :dangling_dependencies, :last_task end - def input - $_workflow_input - end + base.extend Resource + base.lib_dir = Resource.caller_base_dir if base.class == Object + base.tasks = {} + base.jobdir = (File.exists?(base.var.find(:lib)) ? base.var.find(:lib) : base.var.find) + base.clear_dangling + end - $_workflow_default_persistence = :string - def default_persistence - $_workflow_default_persistence + def local_persist(*args, &block) + argsv = *args + options = argsv.pop + if Hash === options + options.merge!(:persistence_dir => cache.find(:lib)) + argsv.push options + else + argsv.push options + argsv.push({:persistence_dir => cache.find(:lib)}) end + Persistence.persist(*argsv, &block) + end - def default_persistence=(type) - $_workflow_default_persistence = type - end + def clear_dangling + @dangling_options = [] + @dangling_option_descriptions = {} + @dangling_option_types = {} + @dangling_option_defaults = {} + @dangling_dependencies = nil + end - def step(step_name, options = nil) - dependencies, options = case - when ((String === options or Symbol === options) and %w(string marshal tsv tsv_string).include? options.to_s) - [nil, {:persistence_type => options}] - when Hash === options - [nil, options] - else - [options, {}] - end + def task_option(*args) + name, description, type, default = *args + @dangling_options << name if name + @dangling_option_descriptions[name] = description if description + @dangling_option_types[name] = type if type + @dangling_option_defaults[name] = default if default + end - options = Misc.add_defaults options, :persistence_type => default_persistence - persistence_type = Misc.process_options options, :persistence_type - dependencies = Misc.process_options options, :dependencies if options.include? :dependencies + def task_dependencies(dependencies) + dependencies = [dependencies] unless Array === dependencies + @dangling_dependencies = dependencies + end - re = Regexp.new(/(?:^|\/)#{Regexp.quote step_name.to_s}\/.*$/) + def process_dangling + res = [ + @dangling_options, + Hash[*@dangling_options.zip(@dangling_option_descriptions.values_at(*@dangling_options)).flatten], + Hash[*@dangling_options.zip(@dangling_option_types.values_at(*@dangling_options)).flatten], + Hash[*@dangling_options.zip(@dangling_option_defaults.values_at(*@dangling_options)).flatten], + @dangling_dependencies || @last_task, + ] - @last_step = nil unless defined? @last_step - @last_persistence_type = nil unless defined? @last_persistence_type + clear_dangling + res + end - if dependencies.nil? && ! @last_step.nil? - dependencies = @last_step - end - @last_step = step_name + def task(name, &block) + if Hash === name + persistence = name.values.first + name = name.keys.first + else + persistence = :marshal + end - # Generate the Hash definition - rule_def = case - when dependencies.nil? - re - when String === dependencies || Symbol === dependencies - {re => lambda{|filename| filename.sub(step_name.to_s, dependencies.to_s) }} - when Array === dependencies - {re => lambda{|filename| dependencies.collect{|dep| filename.sub(step_name.to_s, dep.to_s) } }} - when Proc === dependencies - {re => dependencies} - end + options, option_descriptions, option_types, option_defaults, dependencies = process_dangling + option_descriptions.delete_if do |k,v| v.nil? end + option_types.delete_if do |k,v| v.nil? end + option_defaults.delete_if do |k,v| v.nil? end + task = Task.new name, persistence, options, option_descriptions, option_types, option_defaults, self, dependencies, self, &block + tasks[name] = task + @last_task = task + end - @last_step = step_name - last_persistence_type, @last_persistence_type = @last_persistence_type, persistence_type + def job(task, jobname, *args) + task = tasks[task] + raise "Task #{ task } not found" if task.nil? - rule rule_def do |t| - Persistence.persist(t.name, "", persistence_type, :persistence_file => t.name) do - $_workflow_prereq = case - when (t.prerequisites.nil? or (Array === t.prerequisites and t.prerequisites.empty?)) - nil - else - Persistence.persist(t.prerequisites.first, "", last_persistence_type, :persistence_file => t.prerequisites.first) do - raise "Error, this file should be produced already" - end - end - yield - end - end + all_options, option_descriptions, option_types, option_defaults = task.recursive_options + + non_optional_arguments = all_options.reject{|option| option_defaults.include? option} + run_options = nil + + case + when args.length == non_optional_arguments.length + run_options = Hash[*non_optional_arguments.zip(args).flatten].merge option_defaults + when args.length == non_optional_arguments.length + 1 + optional_args = args.pop + run_options = option_defaults. + merge(optional_args). + merge(Hash[*non_optional_arguments.zip(args).flatten]) + else + raise "Number of non optional arguments (#{non_optional_arguments * ', '}) does not match given (#{args.flatten * ", "})" end + + task.job(jobname, run_options) end - def self.run(file = :default, workflow_input = nil, &block) - $_workflow_input = workflow_input - RakeHelper.run("Runtime", file) do - yield - end + def run(*args) + job(*args).run end - def self.load(wf_file, file = :default, workflow_input = nil) - $_workflow_input = workflow_input - RakeHelper.run(wf_file, file) + def load_job(taskname, job_id) + tasks[taskname].load(job_id) end -end +end