lib/rbbt/util/workflow.rb in rbbt-util-2.1.0 vs lib/rbbt/util/workflow.rb in rbbt-util-3.0.2
- old
+ new
@@ -1,90 +1,114 @@
-require 'rake'
-require 'rbbt/util/rake'
-
+require 'rbbt/util/resource'
+require 'rbbt/util/task'
+require 'rbbt/util/persistence'
module WorkFlow
- module Runner
- def data
- $_workflow_prereq
+ def self.extended(base)
+ class << base
+ attr_accessor :tasks, :jobdir, :dangling_options, :dangling_option_descriptions,
+ :dangling_option_types, :dangling_option_defaults, :dangling_dependencies, :last_task
end
- def input
- $_workflow_input
- end
+ base.extend Resource
+ base.lib_dir = Resource.caller_base_dir if base.class == Object
+ base.tasks = {}
+ base.jobdir = (File.exists?(base.var.find(:lib)) ? base.var.find(:lib) : base.var.find)
+ base.clear_dangling
+ end
- $_workflow_default_persistence = :string
- def default_persistence
- $_workflow_default_persistence
+ def local_persist(*args, &block)
+ argsv = *args
+ options = argsv.pop
+ if Hash === options
+ options.merge!(:persistence_dir => cache.find(:lib))
+ argsv.push options
+ else
+ argsv.push options
+ argsv.push({:persistence_dir => cache.find(:lib)})
end
+ Persistence.persist(*argsv, &block)
+ end
- def default_persistence=(type)
- $_workflow_default_persistence = type
- end
+ def clear_dangling
+ @dangling_options = []
+ @dangling_option_descriptions = {}
+ @dangling_option_types = {}
+ @dangling_option_defaults = {}
+ @dangling_dependencies = nil
+ end
- def step(step_name, options = nil)
- dependencies, options = case
- when ((String === options or Symbol === options) and %w(string marshal tsv tsv_string).include? options.to_s)
- [nil, {:persistence_type => options}]
- when Hash === options
- [nil, options]
- else
- [options, {}]
- end
+ def task_option(*args)
+ name, description, type, default = *args
+ @dangling_options << name if name
+ @dangling_option_descriptions[name] = description if description
+ @dangling_option_types[name] = type if type
+ @dangling_option_defaults[name] = default if default
+ end
- options = Misc.add_defaults options, :persistence_type => default_persistence
- persistence_type = Misc.process_options options, :persistence_type
- dependencies = Misc.process_options options, :dependencies if options.include? :dependencies
+ def task_dependencies(dependencies)
+ dependencies = [dependencies] unless Array === dependencies
+ @dangling_dependencies = dependencies
+ end
- re = Regexp.new(/(?:^|\/)#{Regexp.quote step_name.to_s}\/.*$/)
+ def process_dangling
+ res = [
+ @dangling_options,
+ Hash[*@dangling_options.zip(@dangling_option_descriptions.values_at(*@dangling_options)).flatten],
+ Hash[*@dangling_options.zip(@dangling_option_types.values_at(*@dangling_options)).flatten],
+ Hash[*@dangling_options.zip(@dangling_option_defaults.values_at(*@dangling_options)).flatten],
+ @dangling_dependencies || @last_task,
+ ]
- @last_step = nil unless defined? @last_step
- @last_persistence_type = nil unless defined? @last_persistence_type
+ clear_dangling
+ res
+ end
- if dependencies.nil? && ! @last_step.nil?
- dependencies = @last_step
- end
- @last_step = step_name
+ def task(name, &block)
+ if Hash === name
+ persistence = name.values.first
+ name = name.keys.first
+ else
+ persistence = :marshal
+ end
- # Generate the Hash definition
- rule_def = case
- when dependencies.nil?
- re
- when String === dependencies || Symbol === dependencies
- {re => lambda{|filename| filename.sub(step_name.to_s, dependencies.to_s) }}
- when Array === dependencies
- {re => lambda{|filename| dependencies.collect{|dep| filename.sub(step_name.to_s, dep.to_s) } }}
- when Proc === dependencies
- {re => dependencies}
- end
+ options, option_descriptions, option_types, option_defaults, dependencies = process_dangling
+ option_descriptions.delete_if do |k,v| v.nil? end
+ option_types.delete_if do |k,v| v.nil? end
+ option_defaults.delete_if do |k,v| v.nil? end
+ task = Task.new name, persistence, options, option_descriptions, option_types, option_defaults, self, dependencies, self, &block
+ tasks[name] = task
+ @last_task = task
+ end
- @last_step = step_name
- last_persistence_type, @last_persistence_type = @last_persistence_type, persistence_type
+ def job(task, jobname, *args)
+ task = tasks[task]
+ raise "Task #{ task } not found" if task.nil?
- rule rule_def do |t|
- Persistence.persist(t.name, "", persistence_type, :persistence_file => t.name) do
- $_workflow_prereq = case
- when (t.prerequisites.nil? or (Array === t.prerequisites and t.prerequisites.empty?))
- nil
- else
- Persistence.persist(t.prerequisites.first, "", last_persistence_type, :persistence_file => t.prerequisites.first) do
- raise "Error, this file should be produced already"
- end
- end
- yield
- end
- end
+ all_options, option_descriptions, option_types, option_defaults = task.recursive_options
+
+ non_optional_arguments = all_options.reject{|option| option_defaults.include? option}
+ run_options = nil
+
+ case
+ when args.length == non_optional_arguments.length
+ run_options = Hash[*non_optional_arguments.zip(args).flatten].merge option_defaults
+ when args.length == non_optional_arguments.length + 1
+ optional_args = args.pop
+ run_options = option_defaults.
+ merge(optional_args).
+ merge(Hash[*non_optional_arguments.zip(args).flatten])
+ else
+ raise "Number of non optional arguments (#{non_optional_arguments * ', '}) does not match given (#{args.flatten * ", "})"
end
+
+ task.job(jobname, run_options)
end
- def self.run(file = :default, workflow_input = nil, &block)
- $_workflow_input = workflow_input
- RakeHelper.run("Runtime", file) do
- yield
- end
+ def run(*args)
+ job(*args).run
end
- def self.load(wf_file, file = :default, workflow_input = nil)
- $_workflow_input = workflow_input
- RakeHelper.run(wf_file, file)
+ def load_job(taskname, job_id)
+ tasks[taskname].load(job_id)
end
-end
+end