bin/gemma-wrapper in bio-gemma-wrapper-0.99.6 vs bin/gemma-wrapper in bio-gemma-wrapper-0.99.7

- old
+ new

@@ -2,39 +2,39 @@ # # gemma-wrapper # Author:: Pjotr Prins # License:: GPL3 # -# Copyright (C) 2017-2022 Pjotr Prins <pjotr.prins@thebird.nl> +# Copyright (C) 2017-2024 Pjotr Prins <pjotr.prins@thebird.nl> USAGE = " GEMMA wrapper example: Simple caching of K computation with - gemma-wrapper -- \\ - -g test/data/input/BXD_geno.txt.gz \\ - -p test/data/input/BXD_pheno.txt \\ + gemma-wrapper -- \ + -g test/data/input/BXD_geno.txt.gz \ + -p test/data/input/BXD_pheno.txt \ -a test/data/input/BXD_snps.txt \ - -gk + -gk > K.json LOCO K computation with caching and JSON output - gemma-wrapper --json --loco -- \\ - -g test/data/input/BXD_geno.txt.gz \\ - -p test/data/input/BXD_pheno.txt \\ - -a test/data/input/BXD_snps.txt \\ + gemma-wrapper --json --loco -- \ + -g test/data/input/BXD_geno.txt.gz \ + -p test/data/input/BXD_pheno.txt \ + -a test/data/input/BXD_snps.txt \ -gk -debug > K.json LMM's using the K's captured in K.json using the --input switch - gemma-wrapper --json --loco --input K.json -- \\ - -g test/data/input/BXD_geno.txt.gz \\ - -p test/data/input/BXD_pheno.txt \\ - -c test/data/input/BXD_covariates2.txt \\ - -a test/data/input/BXD_snps.txt \\ - -lmm 2 -maf 0.1 \\ + gemma-wrapper --json --loco --input K.json -- \ + -g test/data/input/BXD_geno.txt.gz \ + -p test/data/input/BXD_pheno.txt \ + -c test/data/input/BXD_covariates2.txt \ + -a test/data/input/BXD_snps.txt \ + -lmm 9 -maf 0.1 \ -debug > GWA.json Gemma gets used from the path. You can override by setting env GEMMA_COMMAND=path/bin/gemma gemma-wrapper ... @@ -43,10 +43,11 @@ GEMMA_V_MAJOR = 98 GEMMA_V_MINOR = 4 basepath = File.dirname(File.dirname(__FILE__)) $: << File.join(basepath,'lib') +BIN = File.join(basepath,'bin') VERSION_FILENAME=File.join(basepath,'VERSION') version = File.new(VERSION_FILENAME).read.chomp # ---- GEMMA path @@ -67,22 +68,25 @@ hashme = nil require 'digest/sha1' require 'fileutils' require 'optparse' +require 'open3' +require 'socket' # for hostname require 'tempfile' +require 'time' require 'tmpdir' require 'lock' split_at = ARGV.index('--') if split_at gemma_args = ARGV[split_at+1..-1] end -options = { show_help: false, source: 'https://github.com/genetics-statistics/gemma-wrapper', version: version+' (Pjotr Prins)', date: Time.now.to_s, gemma_command: gemma_command, cache_dir: Dir.tmpdir(), quiet: false, permute_phenotypes: false, parallel: nil } +options = { show_help: false, source: 'https://github.com/genetics-statistics/gemma-wrapper', version: version+' (Pjotr Prins)', date: Time.now.to_s, gemma_command: gemma_command, cache_dir: Dir.tmpdir(), quiet: false, permute_phenotypes: false, lmdb: nil, parallel: nil } opts = OptionParser.new do |o| o.banner = "\nUsage: #{File.basename($0)} [options] -- [gemma-options]" o.on('--permutate n', Integer, 'Permutate # times by shuffling phenotypes') do |lst| @@ -97,10 +101,26 @@ o.on('--loco', 'Run full leave-one-chromosome-out (LOCO)') do |b| options[:loco] = b end + o.on('--population NAME', 'Add population identifier to metadata') do |n| + options[:population] = n + end + + o.on('--name NAME', 'Add dataset identifier to metadata') do |n| + options[:name] = n + end + + o.on('--id ID', 'Add identifier to metadata') do |n| + options[:id] = n + end + + o.on('--trait TRAIT', 'Add trait identifier to metadata') do |n| + options[:trait] = n + end + o.on('--chromosomes [1,2,3]',Array,'Run specific chromosomes') do |lst| options[:chromosomes] = lst end o.on('--input filen',String, 'JSON input variables (used for LOCO)') do |filen| @@ -118,18 +138,26 @@ o.on("--force", "Force computation (override cache)") do |q| options[:force] = true end + o.on("--keep", "Keep intermediate files in output") do |q| + options[:keep] = true + end + o.on("--parallel", "Run jobs in parallel") do |b| options[:parallel] = true end o.on("--no-parallel", "Do not run jobs in parallel") do |b| options[:parallel] = false end + o.on("--lmdb", "Generate lmdb output") do |b| + options[:lmdb] = true + end + o.on("--slurm[=opts]",String,"Use slurm PBS for submitting jobs") do |slurm| options[:slurm_opts] = "" options[:slurm] = true if slurm options[:slurm_opts] = slurm @@ -167,15 +195,22 @@ # ---- Output handlers OUTPUT = (options[:json] ? $stderr : $stdout ) record = { warnings: [], errno: 0, debug: [] } +record[:name] = options[:name] if options[:name] +record[:id] = options[:id] if options[:id] +record[:trait] = options[:trait] if options[:trait] +d = DateTime.now +record[:time] = d.strftime("%Y/%m/%d %H:%M") +record[:user] = ENV["USER"] +record[:hostname] = Socket.gethostname require 'json' json_out = lambda do - print record.to_json if options[:json] + record.to_json if options[:json] end # ---- Some error handlers error = lambda do |*msg| if options[:json] @@ -213,21 +248,22 @@ h.map { |k,v| k } end # ---- Start banner GEMMA_K_VERSION=version -GEMMA_K_BANNER = "gemma-wrapper #{version} (Ruby #{RUBY_VERSION}) by Pjotr Prins 2017-2022\n" +GEMMA_K_BANNER = "gemma-wrapper #{version} (Ruby #{RUBY_VERSION}) by Pjotr Prins 2017-2024\n" info.call GEMMA_K_BANNER # Check gemma version begin gemma_command2 = options[:gemma_command] - info.call "NOTE: gemma-wrapper is soon to be replaced" + # info.call "NOTE: gemma-wrapper is soon to be replaced" + debug.call("Invoke #{gemma_command2}") GEMMA_INFO = `#{gemma_command2}` rescue Errno::ENOENT - gemma_command2 = "gemma" + gemma_command2 = "gemma" if not gemma_command2 error.call "<#{gemma_command2}> command not found" end gemma_version_header = GEMMA_INFO.split("\n").grep(/GEMMA|Version/)[0].strip info.call "Using ",gemma_version_header,"\n" @@ -247,11 +283,11 @@ print USAGE exit 1 end if RUBY_VERSION =~ /^1/ - warning "runs on Ruby 2.x only\n" + warning "does not run on Ruby 1.x\n" end # ---- LOCO defaults to parallel if options[:parallel] == nil options[:parallel] = true if options[:loco] @@ -270,10 +306,13 @@ PARALLEL_INFO = `parallel --help` rescue Errno::ENOENT error.call "<parallel> command not found" end parallel_cmds = [] + if not options[:json] + error.call "<parallel> needs --json switch" + end end # ---- Fetch chromosomes from SNP annotation file anno_idx = gemma_args.index '-a' raise "Expected GEMMA -a genotype file switch" if anno_idx == nil @@ -286,27 +325,73 @@ if DO_COMPUTE_GWA and options[:permute_phenotypes] raise "Did not expect GEMMA -p phenotype whith permutations (only use --permutate-phenotypes)" if pheno_idx end +matches = { + chr: [:string, /-loco (\S+) /], + user_time: [:float, /User time \(seconds\): ([\d\.]+)/], + system_time: [:float, /System time \(seconds\): ([\d\.]+)/], + perc_cpu: [:int, /Percent of CPU this job got: (\d+)%/], + wall_clock: [:string, /Elapsed \(wall clock\) time \(h:mm:ss or m:ss\): (\S+)/], + ram_usage_gb: [:gb, /Maximum resident set size \(kbytes\): (\d+)/], + command: [:string, /Command being timed: (.+)/] +} + +parse_stats = lambda { |buf| + stats = {} + buf.split("\\n").each do |s| + if s =~ /^\t/ + matches.each do |k,v| + type,m = v + if s =~ m + # $stderr.print $1,s + stats[k] = + case type + when :float + $1.to_f + when :int + $1.to_i + when :gb + (($1.to_f)/1048576.0).round(3) + else + $1 + end + end + end + end + end + stats +} + +run_stat = {} + execute = lambda { |cmd| info.call("Executing: #{cmd}") err = 0 - if not options[:debug] - # send output to stderr line by line - IO.popen("#{cmd}") do |io| - while s = io.gets - $stderr.print s - end - io.close - err = $?.to_i - end - else - $stderr.print `#{cmd}` - err = $?.to_i + stdout_buf = "" + stderr_buf = "" + stats = {} + Open3.popen3("time -v #{cmd}") do |stdin,stdout,stderr,wait_thr| + stderr_buf = stderr.read + stdout_buf = stdout.read + stats = parse_stats.call(stderr_buf) + stdin.close + stdout.close + stderr.close + err = wait_thr.value end - err + $stderr.print(stderr_buf) if options[:debug] + if err and err != 0 + $stderr.print(stdout_buf) + $stderr.print(stderr_buf) if not options[:debug] + $stderr.print "FATAL ERROR: gemma-wrapper bailed out with #{err}\n" + # sleep 10_000 + $stderr.print Kernel.caller().join("\n") + exit 1 + end + return err,stats } compute_hash = lambda do | phenofn = nil | # Compute a HASH on the inputs error.call "Hash is empty" if hashme == nil or hashme == [] @@ -317,10 +402,11 @@ else hashme end debug.call("Hashing on ",hm) hm.each do | item | + # if entry is a file use the hash of its content, otherwise just the entry itself if File.file?(item) hashes << Digest::SHA1.hexdigest(File.read(item)) debug.call [item,hashes.last] else hashes << item @@ -341,19 +427,20 @@ else GEMMA_ARGS_HASH end HASH = compute_hash.call() +options[:compute_hash_on] = hashme options[:hash] = HASH at_exit do Lock.release(HASH) end Lock.create(HASH) # this will wait for a lock to expire -joblog = options[:cache_dir]+"/"+HASH+"-parallel.log" +JOBLOG = HASH+"-parallel.log" # Create cache dir FileUtils::mkdir_p options[:cache_dir] Dir.mktmpdir do |tmpdir| # tmpdir for GEMMA output @@ -363,11 +450,11 @@ GEMMA_ARGS = gemma_args debug.call "Options: ",options,"\n" if !options[:quiet] invoke_gemma = lambda do |extra_args, cache_hit = false, chr = "full", permutation = 1| - cmd = "#{gemma_command2} #{GEMMA_ARGS.join(' ')} #{extra_args.join(' ')}" + cmd = "time -v #{gemma_command2} #{extra_args.join(' ')} #{GEMMA_ARGS.join(' ')}" record[:gemma_command] = cmd return if cache_hit if options[:slurm] info.call cmd hashi = HASH @@ -393,11 +480,11 @@ info.call("Would have invoked: ",cmd) elsif options[:parallel] info.call("Add parallel job: ",cmd) parallel_cmds << cmd else - err = execute.call(cmd) + err,stats = execute.call(cmd) end err else if options[:dry_run] info.call("Would have invoked ",cmd) @@ -414,10 +501,12 @@ json_out.call raise "exit on GEMMA error #{errno}" end end +create_archive = false + # Takes the hash value and checks whether the (output) file exists # returns datafn, logfn, cache_hit cache = lambda do | chr, ext, h=HASH, permutation=0 | inject = (chr==nil ? "" : ".#{chr}" )+ext hashi = (chr==nil ? h : h+inject) @@ -425,14 +514,19 @@ # for chr 3 and permutation 1 forms something like # /tmp/1b700-a996f.3.cXX.txt.1.log.txt logfn = prefix+".log.txt" datafn = prefix+ext record[:files] ||= [] - record[:files].push [chr,logfn,datafn] + log_basefn = File.basename(logfn) + data_basefn = File.basename(datafn) + log_tmpfn = tmpdir+"/"+log_basefn + data_tmpfn = tmpdir+"/"+data_basefn + record[:files].push [chr,log_basefn,data_basefn] if !options[:force] - if File.exist? logfn and File.exist? datafn - if File.read(logfn).include? "total computation time" + info.call "Checking for #{data_tmpfn}" + if File.exist? log_tmpfn and File.exist? data_tmpfn + if File.read(log_tmpfn).include? "total computation time" record[:cache_hit] = true info.call "#{logfn} CACHE HIT!\n" return hashi, true end end @@ -446,12 +540,14 @@ ext = case (GEMMA_ARGS[GEMMA_ARGS.index('-gk')+1]).to_i when 0,1 then '.cXX.txt' when 2 then '.sXX.txt' else error.call "Unknown kinship type" end + # ---- check cache: hashi, cache_hit = cache.call chr,ext if not cache_hit + create_archive = true if chr != nil invoke_gemma.call [ '-loco', chr, '-o', hashi ], cache_hit else invoke_gemma.call [ '-o', HASH ], cache_hit end @@ -464,12 +560,14 @@ error.call "Do not use the GEMMA -k switch with gemma-wrapper - it is automatic!" if GEMMA_ARGS.include? '-k' # K is automatic # Update hash for each permutation hash = compute_hash.call(pfn) hashi, cache_hit = cache.call(chr,".assoc.txt",hash,permutation) if not cache_hit - args = [ '-k', kfn, '-o', hashi ] + create_archive = true + args = [] args << [ '-loco', chr ] if chr != nil + args << [ '-k', kfn, '-o', hashi ] args << [ '-p', pfn ] if pfn invoke_gemma.call args,false,chr,permutation end end @@ -478,23 +576,36 @@ if options[:chromosomes] CHROMOSOMES = options[:chromosomes] end end +json_in = nil + if DO_COMPUTE_KINSHIP # compute K + ARCHIVE = options[:cache_dir]+"/"+HASH+"-gemma-cXX.tar.xz" + + if File.exist? ARCHIVE and not options[:force] + info.call "Unpack archive #{ARCHIVE}!" + execute.call "tar xJf #{ARCHIVE} -C #{tmpdir}" + end info.call CHROMOSOMES if LOCO CHROMOSOMES.each do |chr| - info.call "LOCO for ",chr + info.call "Compute kinship LOCO for chr ",chr kinship.call(chr) end else kinship.call # no LOCO end else # DO_COMPUTE_GWA + ARCHIVE = options[:cache_dir]+"/"+HASH+"-gemma-GWA.tar.xz" + if File.exist? ARCHIVE and not options[:force] + info.call "Unpack archive #{ARCHIVE}!" + execute.call "env XZ_OPT='-T0' tar xJf #{ARCHIVE} -C #{tmpdir}" + end begin json_in = JSON.parse(File.read(options[:input])) rescue TypeError raise "Missing JSON input file?" end @@ -502,16 +613,24 @@ pfn = options[:permute_phenotypes] # can be nil if LOCO k_files = json_in["files"].map { |rec| [rec[0],rec[2]] } k_files.each do | chr, kfn | # call a GWA for each chromosome - gwas.call(chr,kfn,pfn) + + kfn2 = options[:cache_dir]+"/"+kfn + if not File.exist?(kfn2) and json_in["archive"] + # we aim to unpack the archive once on reuse + archive_grm = options[:cache_dir]+"/"+json_in["archive"] + execute.call "env XZ_OPT='-T0' tar xJf #{archive_grm} -C #{options[:cache_dir]}" + end + + gwas.call(chr,kfn2,pfn) end else kfn = json_in["files"][0][2] CHROMOSOMES.each do | chr | - gwas.call(chr,kfn,pfn) + gwas.call(chr,tmpdir+"/"+kfn,pfn) end end # Permute if options[:permutate] ps = [] @@ -560,49 +679,107 @@ end # ---- Invoke parallel if options[:parallel] # parallel_cmds = ["echo 1","sleep 1 && echo 2", "false", "echo 3"] + joblog = tmpdir+"/"+JOBLOG Tempfile.open("commands.txt") do |f| cmdfn = f.path File.open(cmdfn,"w") do |f| parallel_cmds.each do |c| f.puts(c) end end cmd = "cat \"#{cmdfn}\"" - err = execute.call(cmd+"|parallel --joblog #{joblog}") # first try optimistically to run all jobs in parallel + debug.call("tmpdir=#{tmpdir}") + err,stats = execute.call(cmd+"|parallel --results #{tmpdir} --joblog #{joblog}") # first try optimistically to run all jobs in parallel if err != 0 - [16,8,4,1].each do |jobs| + [4,1].each do |jobs| info.call("Failed to complete parallel run -- retrying with smaller RAM footprint!") - err = execute.call(cmd+"|parallel -j #{jobs} --resume --joblog #{joblog}") + err,stats = execute.call(cmd+"|parallel -j #{jobs} --results #{tmpdir} --resume --joblog #{joblog}") break if err == 0 end if err != 0 info.call("Parallel run failed!") debug.call("Job log is: ",File.read(joblog)) - # Remove remaining files - FileUtils.mv joblog, joblog+".bak", verbose: false, force: true - FileUtils.rm_rf("#{tmpdir}/*", secure: true) exit err end end end info.call("Run successful!") - FileUtils.mv joblog, joblog+".bak", verbose: false, force: true end -json_out.call -# copy all output files to the cache_dir. If a file exists only emit a warning -Dir.glob("*.txt", base: tmpdir) do | fn | - source = tmpdir + "/" + fn - dest = options[:cache_dir] + "/" + fn - if not File.exist?(dest) or options[:force] - info.call "Move #{source} to #{dest}" - FileUtils.mv source, dest, verbose: false - else - warning.call "File #{dest} already exists. Not overwriting" +# Collect stats from parallel run + +run_stats = {} +$stderr.print "STATS" +Dir.glob(tmpdir+'/*/*' ).each do | dir | + File.open("#{dir}/stderr") { |f| + run_stat = parse_stats.call(f.read) + chr = run_stat[:chr] + run_stats[chr] = run_stat + } +end +# Now add up the stats +user_time = 0.0 +system_time = 0.0 +wall_clock = "0" +ram_usage_gb = 0.0 +run_stats.each do | k, v | + wall_clock=v[:wall_clock] if v[:wall_clock]>wall_clock + ram_usage_gb += v[:ram_usage_gb] + user_time += v[:user_time] + system_time += v[:system_time] +end + +record[:user_time] = user_time +record[:system_time] = system_time +record[:wall_clock] = wall_clock +record[:ram_usage_gb] = ram_usage_gb.round(2) +record[:run_stats] = run_stats + +if create_archive + if DO_COMPUTE_GWA + LMDB = tmpdir+"/"+HASH+'.mdb' + # create lmdb database - we call out into a python script for that. + # first create a JSON record + + meta = { + type: "gemma-wrapper", + version: version, + population: options[:population], + name: options[:name], + trait: options[:trait], + url: "https://genenetwork.org/show_trait?trait_id="+options[:trait]+"&dataset="+options[:name], + archive_GRM: json_in["archive"], + archive_GWA: File.basename(ARCHIVE), + } + if options[:id] and options[:id] =~ /,/ # this is GN specific + dataid,probesetid,probesetfreezeid = options[:id].split(",") + meta[:dataid] = dataid.to_i + meta[:probesetid] = probesetid.to_i + meta[:probesetfreezeid] = probesetfreezeid.to_i + end + record[:meta] = meta + metafn = tmpdir+"/"+HASH+"-meta.json" + File.write(metafn,record.to_json) + # sleep 10_000 + if options[:lmdb] + File.unlink(LMDB) if File.exist?(LMDB) # removed any cached lmdb + execute.call "python3 #{BIN}/gemma2lmdb.py --db=#{LMDB} --meta=#{metafn} #{tmpdir}/*assoc.txt" + end + if not options[:keep] + execute.call "rm -f #{tmpdir}/1/*/* #{tmpdir}/*.txt #{tmpdir}/*.log #{tmpdir}/*.mdb-lock" # remove GEMMA output files + FileUtils.rm_rf("#{tmpdir}/1", secure: true) + end end + File.write(tmpdir+"/"+HASH+"-gemma-wrapper-output.json",json_out.call) + info.call "Creating archive #{ARCHIVE}..." + execute.call "env XZ_OPT='-T0' tar -cvJf #{ARCHIVE} -C #{tmpdir} ." end end # tmpdir + +record[:archive] = File.basename(ARCHIVE) + +print json_out.call