lib/rbbt/workflow/schedule.rb in rbbt-util-5.26.6 vs lib/rbbt/workflow/schedule.rb in rbbt-util-5.26.8

- old
+ new

@@ -1,8 +1,8 @@ class Step class ResourceManager - class NotEoughResources < Exception + class NotEnoughResources < Exception end attr_accessor :cpus, :memory def initialize(cpus = nil, memory = nil) @cpus = cpus @@ -13,11 +13,11 @@ def allocate(cpus = nil, memory = nil, &block) RbbtSemaphore.synchronize(@semaphore) do if (@cpus && cpus && @cpus < cups) || (@memory && memory && @memory < memory) - raise NotEoughResources + raise NotEnoughResources end begin @cpus -= cpus @memory -= memory yield @@ -59,140 +59,147 @@ jdeps += job.inputs.flatten.select{|i| Step === i} jdeps.reject!{|dep| dep.done? } @job_deps[job.path] = [] jdeps.each do |dep| - next if dep.done? + #next if dep.done? @dep_jobs[dep.path] ||= [] @job_deps[job.path] << dep.path @dep_jobs[dep.path] << job.path with_deps << dep unless @job_deps.include? dep.path end end - - def self.ready + def ready @job_deps.select do |jobp,deps| (@missing & deps).empty? end.collect{|jobp,deps| jobp} end - def self.next + def used + iii @dep_jobs + @dep_jobs.select do |dep,jobs| + iif [dep, @missing.to_a, jobs] + (@missing & jobs).empty? + end.keys + end + + def next priorities = {} @jobs.each do |job| - priorities = 1 + priorities[job.path] = 1 end @missing.each do |jobp| end - @dep_jobsb + ready.first end end end - def self._priorities(jobs) - job_level = {} - jobs.each do |job| - job_level[job.path] = 1.0 - end + #def self._priorities(jobs) + # job_level = {} + # jobs.each do |job| + # job_level[job.path] = 1.0 + # end - with_deps = jobs.dup - dep_jobs = {} - job_deps = {} - while with_deps.any? - job = with_deps.pop - level = job_level[job.path] - job_deps[job.path] = [] - jdeps = job.dependencies - jdeps += job.inputs.flatten.select{|i| Step === i} + # with_deps = jobs.dup + # dep_jobs = {} + # job_deps = {} + # while with_deps.any? + # job = with_deps.pop + # level = job_level[job.path] + # job_deps[job.path] = [] + # jdeps = job.dependencies + # jdeps += job.inputs.flatten.select{|i| Step === i} - jdeps.reject!{|dep| dep.done? } - jdeps.each do |dep| - next if dep.done? - dep_jobs[dep.path] ||= [] - job_level[dep.path] = level / (10 * jdeps.length) if job_level[dep.path].nil? || job_level[dep.path] < level / (10 * jdeps.length) - job_deps[job.path] << dep.path - dep_jobs[dep.path] << job.path - with_deps << dep unless job_deps.include? dep.path - end - end - [job_level, job_deps, dep_jobs] - end + # jdeps.reject!{|dep| dep.done? } + # jdeps.each do |dep| + # next if dep.done? + # dep_jobs[dep.path] ||= [] + # job_level[dep.path] = level / (10 * jdeps.length) if job_level[dep.path].nil? || job_level[dep.path] < level / (10 * jdeps.length) + # job_deps[job.path] << dep.path + # dep_jobs[dep.path] << job.path + # with_deps << dep unless job_deps.include? dep.path + # end + # end + # [job_level, job_deps, dep_jobs] + #end - def self.produce_jobs(jobs, cpus, step_cpus = {}) - require 'fc' + #def self.produce_jobs(jobs, cpus, step_cpus = {}) + # require 'fc' - step_cpus = IndiferentHash.setup(step_cpus || {}) + # step_cpus = IndiferentHash.setup(step_cpus || {}) - deps = [] + # deps = [] - jobs = [jobs] unless Array === jobs + # jobs = [jobs] unless Array === jobs - job_level, job_deps, dep_jobs = self._priorities(jobs) + # job_level, job_deps, dep_jobs = self._priorities(jobs) - jobps = {} - (jobs + jobs.collect{|job| job.rec_dependencies}).flatten.uniq.each do |job| - jobps[job.path] = job - end + # jobps = {} + # (jobs + jobs.collect{|job| job.rec_dependencies}).flatten.uniq.each do |job| + # jobps[job.path] = job + # end - prio_queue = FastContainers::PriorityQueue.new :max + # prio_queue = FastContainers::PriorityQueue.new :max - job_deps.each do |jobp,depps| - next if depps.any? - level = job_level[jobp] + # job_deps.each do |jobp,depps| + # next if depps.any? + # level = job_level[jobp] - prio_queue.push(jobp, level) - end + # prio_queue.push(jobp, level) + # end - queue = RbbtProcessQueue.new cpus + # queue = RbbtProcessQueue.new cpus - missing = job_deps.keys - queue.callback do |jobp| - Log.info "Done: #{jobp}" - missing -= [jobp] + # missing = job_deps.keys + # queue.callback do |jobp| + # Log.info "Done: #{jobp}" + # missing -= [jobp] - job_level, job_deps, dep_jobs = self._priorities(jobs) + # job_level, job_deps, dep_jobs = self._priorities(jobs) - parentsp = dep_jobs[jobp] + # parentsp = dep_jobs[jobp] - parentsp.each do |parentp| - next unless job_deps[parentp].include? jobp - job_deps[parentp] -= [jobp] - if job_deps[parentp].empty? - level = job_level[parentp] - prio_queue.push(parentp, level ) - end - end if parentsp - prio_queue_new = FastContainers::PriorityQueue.new :max - while prio_queue.any? - elem = prio_queue.pop - prio_queue_new.push(elem, job_level[elem]) - end - prio_queue = prio_queue_new - end - - queue.init do |jobp| - Log.info "Processing: #{jobp}" - job = jobps[jobp] - job_cpus = step_cpus[job.task_name] || 1 - sleep 0.5 - #job.produce - jobp - end + # parentsp.each do |parentp| + # next unless job_deps[parentp].include? jobp + # job_deps[parentp] -= [jobp] + # if job_deps[parentp].empty? + # level = job_level[parentp] + # prio_queue.push(parentp, level ) + # end + # end if parentsp + # prio_queue_new = FastContainers::PriorityQueue.new :max + # while prio_queue.any? + # elem = prio_queue.pop + # prio_queue_new.push(elem, job_level[elem]) + # end + # prio_queue = prio_queue_new + # end + # + # queue.init do |jobp| + # Log.info "Processing: #{jobp}" + # job = jobps[jobp] + # job_cpus = step_cpus[job.task_name] || 1 + # sleep 0.5 + # #job.produce + # jobp + # end - while missing.any? - while prio_queue.empty? && missing.any? - sleep 1 - end - break if missing.empty? - jobp = prio_queue.pop - queue.process jobp - end + # while missing.any? + # while prio_queue.empty? && missing.any? + # sleep 1 + # end + # break if missing.empty? + # jobp = prio_queue.pop + # queue.process jobp + # end - queue.join - end + # queue.join + #end end if __FILE__ == $0 require 'rbbt/workflow' @@ -217,11 +224,15 @@ Rbbt::Config.load_file Rbbt.etc.config_profile.HTS.find Workflow.require_workflow "Sample" Workflow.require_workflow "HTS" jobs = [] - jobs << Sample.job(:mutect2, "QUINTANA-15") - jobs << Sample.job(:mutect2, "QUINTANA-25") - jobs << Sample.job(:mutect2, "QUINTANA-28") +# jobs << Sample.job(:mutect2, "QUINTANA-15") +# jobs << Sample.job(:mutect2, "QUINTANA-25") +# jobs << Sample.job(:mutect2, "QUINTANA-22") + jobs << Sample.job(:mutect2, "small") sched = Step::Scheduler.new(jobs, 3) + iii sched.ready + iii sched.used + iii sched.next end