bin/gemma-wrapper in bio-gemma-wrapper-0.99.6 vs bin/gemma-wrapper in bio-gemma-wrapper-0.99.7
- old
+ new
@@ -2,39 +2,39 @@
#
# gemma-wrapper
# Author:: Pjotr Prins
# License:: GPL3
#
-# Copyright (C) 2017-2022 Pjotr Prins <pjotr.prins@thebird.nl>
+# Copyright (C) 2017-2024 Pjotr Prins <pjotr.prins@thebird.nl>
USAGE = "
GEMMA wrapper example:
Simple caching of K computation with
- gemma-wrapper -- \\
- -g test/data/input/BXD_geno.txt.gz \\
- -p test/data/input/BXD_pheno.txt \\
+ gemma-wrapper -- \
+ -g test/data/input/BXD_geno.txt.gz \
+ -p test/data/input/BXD_pheno.txt \
-a test/data/input/BXD_snps.txt \
- -gk
+ -gk > K.json
LOCO K computation with caching and JSON output
- gemma-wrapper --json --loco -- \\
- -g test/data/input/BXD_geno.txt.gz \\
- -p test/data/input/BXD_pheno.txt \\
- -a test/data/input/BXD_snps.txt \\
+ gemma-wrapper --json --loco -- \
+ -g test/data/input/BXD_geno.txt.gz \
+ -p test/data/input/BXD_pheno.txt \
+ -a test/data/input/BXD_snps.txt \
-gk -debug > K.json
LMM's using the K's captured in K.json using the --input switch
- gemma-wrapper --json --loco --input K.json -- \\
- -g test/data/input/BXD_geno.txt.gz \\
- -p test/data/input/BXD_pheno.txt \\
- -c test/data/input/BXD_covariates2.txt \\
- -a test/data/input/BXD_snps.txt \\
- -lmm 2 -maf 0.1 \\
+ gemma-wrapper --json --loco --input K.json -- \
+ -g test/data/input/BXD_geno.txt.gz \
+ -p test/data/input/BXD_pheno.txt \
+ -c test/data/input/BXD_covariates2.txt \
+ -a test/data/input/BXD_snps.txt \
+ -lmm 9 -maf 0.1 \
-debug > GWA.json
Gemma gets used from the path. You can override by setting
env GEMMA_COMMAND=path/bin/gemma gemma-wrapper ...
@@ -43,10 +43,11 @@
GEMMA_V_MAJOR = 98
GEMMA_V_MINOR = 4
basepath = File.dirname(File.dirname(__FILE__))
$: << File.join(basepath,'lib')
+BIN = File.join(basepath,'bin')
VERSION_FILENAME=File.join(basepath,'VERSION')
version = File.new(VERSION_FILENAME).read.chomp
# ---- GEMMA path
@@ -67,22 +68,25 @@
hashme = nil
require 'digest/sha1'
require 'fileutils'
require 'optparse'
+require 'open3'
+require 'socket' # for hostname
require 'tempfile'
+require 'time'
require 'tmpdir'
require 'lock'
split_at = ARGV.index('--')
if split_at
gemma_args = ARGV[split_at+1..-1]
end
-options = { show_help: false, source: 'https://github.com/genetics-statistics/gemma-wrapper', version: version+' (Pjotr Prins)', date: Time.now.to_s, gemma_command: gemma_command, cache_dir: Dir.tmpdir(), quiet: false, permute_phenotypes: false, parallel: nil }
+options = { show_help: false, source: 'https://github.com/genetics-statistics/gemma-wrapper', version: version+' (Pjotr Prins)', date: Time.now.to_s, gemma_command: gemma_command, cache_dir: Dir.tmpdir(), quiet: false, permute_phenotypes: false, lmdb: nil, parallel: nil }
opts = OptionParser.new do |o|
o.banner = "\nUsage: #{File.basename($0)} [options] -- [gemma-options]"
o.on('--permutate n', Integer, 'Permutate # times by shuffling phenotypes') do |lst|
@@ -97,10 +101,26 @@
o.on('--loco', 'Run full leave-one-chromosome-out (LOCO)') do |b|
options[:loco] = b
end
+ o.on('--population NAME', 'Add population identifier to metadata') do |n|
+ options[:population] = n
+ end
+
+ o.on('--name NAME', 'Add dataset identifier to metadata') do |n|
+ options[:name] = n
+ end
+
+ o.on('--id ID', 'Add identifier to metadata') do |n|
+ options[:id] = n
+ end
+
+ o.on('--trait TRAIT', 'Add trait identifier to metadata') do |n|
+ options[:trait] = n
+ end
+
o.on('--chromosomes [1,2,3]',Array,'Run specific chromosomes') do |lst|
options[:chromosomes] = lst
end
o.on('--input filen',String, 'JSON input variables (used for LOCO)') do |filen|
@@ -118,18 +138,26 @@
o.on("--force", "Force computation (override cache)") do |q|
options[:force] = true
end
+ o.on("--keep", "Keep intermediate files in output") do |q|
+ options[:keep] = true
+ end
+
o.on("--parallel", "Run jobs in parallel") do |b|
options[:parallel] = true
end
o.on("--no-parallel", "Do not run jobs in parallel") do |b|
options[:parallel] = false
end
+ o.on("--lmdb", "Generate lmdb output") do |b|
+ options[:lmdb] = true
+ end
+
o.on("--slurm[=opts]",String,"Use slurm PBS for submitting jobs") do |slurm|
options[:slurm_opts] = ""
options[:slurm] = true
if slurm
options[:slurm_opts] = slurm
@@ -167,15 +195,22 @@
# ---- Output handlers
OUTPUT = (options[:json] ? $stderr : $stdout )
record = { warnings: [], errno: 0, debug: [] }
+record[:name] = options[:name] if options[:name]
+record[:id] = options[:id] if options[:id]
+record[:trait] = options[:trait] if options[:trait]
+d = DateTime.now
+record[:time] = d.strftime("%Y/%m/%d %H:%M")
+record[:user] = ENV["USER"]
+record[:hostname] = Socket.gethostname
require 'json'
json_out = lambda do
- print record.to_json if options[:json]
+ record.to_json if options[:json]
end
# ---- Some error handlers
error = lambda do |*msg|
if options[:json]
@@ -213,21 +248,22 @@
h.map { |k,v| k }
end
# ---- Start banner
GEMMA_K_VERSION=version
-GEMMA_K_BANNER = "gemma-wrapper #{version} (Ruby #{RUBY_VERSION}) by Pjotr Prins 2017-2022\n"
+GEMMA_K_BANNER = "gemma-wrapper #{version} (Ruby #{RUBY_VERSION}) by Pjotr Prins 2017-2024\n"
info.call GEMMA_K_BANNER
# Check gemma version
begin
gemma_command2 = options[:gemma_command]
- info.call "NOTE: gemma-wrapper is soon to be replaced"
+ # info.call "NOTE: gemma-wrapper is soon to be replaced"
+ debug.call("Invoke #{gemma_command2}")
GEMMA_INFO = `#{gemma_command2}`
rescue Errno::ENOENT
- gemma_command2 = "gemma"
+ gemma_command2 = "gemma" if not gemma_command2
error.call "<#{gemma_command2}> command not found"
end
gemma_version_header = GEMMA_INFO.split("\n").grep(/GEMMA|Version/)[0].strip
info.call "Using ",gemma_version_header,"\n"
@@ -247,11 +283,11 @@
print USAGE
exit 1
end
if RUBY_VERSION =~ /^1/
- warning "runs on Ruby 2.x only\n"
+ warning "does not run on Ruby 1.x\n"
end
# ---- LOCO defaults to parallel
if options[:parallel] == nil
options[:parallel] = true if options[:loco]
@@ -270,10 +306,13 @@
PARALLEL_INFO = `parallel --help`
rescue Errno::ENOENT
error.call "<parallel> command not found"
end
parallel_cmds = []
+ if not options[:json]
+ error.call "<parallel> needs --json switch"
+ end
end
# ---- Fetch chromosomes from SNP annotation file
anno_idx = gemma_args.index '-a'
raise "Expected GEMMA -a genotype file switch" if anno_idx == nil
@@ -286,27 +325,73 @@
if DO_COMPUTE_GWA and options[:permute_phenotypes]
raise "Did not expect GEMMA -p phenotype whith permutations (only use --permutate-phenotypes)" if pheno_idx
end
+matches = {
+ chr: [:string, /-loco (\S+) /],
+ user_time: [:float, /User time \(seconds\): ([\d\.]+)/],
+ system_time: [:float, /System time \(seconds\): ([\d\.]+)/],
+ perc_cpu: [:int, /Percent of CPU this job got: (\d+)%/],
+ wall_clock: [:string, /Elapsed \(wall clock\) time \(h:mm:ss or m:ss\): (\S+)/],
+ ram_usage_gb: [:gb, /Maximum resident set size \(kbytes\): (\d+)/],
+ command: [:string, /Command being timed: (.+)/]
+}
+
+parse_stats = lambda { |buf|
+ stats = {}
+ buf.split("\\n").each do |s|
+ if s =~ /^\t/
+ matches.each do |k,v|
+ type,m = v
+ if s =~ m
+ # $stderr.print $1,s
+ stats[k] =
+ case type
+ when :float
+ $1.to_f
+ when :int
+ $1.to_i
+ when :gb
+ (($1.to_f)/1048576.0).round(3)
+ else
+ $1
+ end
+ end
+ end
+ end
+ end
+ stats
+}
+
+run_stat = {}
+
execute = lambda { |cmd|
info.call("Executing: #{cmd}")
err = 0
- if not options[:debug]
- # send output to stderr line by line
- IO.popen("#{cmd}") do |io|
- while s = io.gets
- $stderr.print s
- end
- io.close
- err = $?.to_i
- end
- else
- $stderr.print `#{cmd}`
- err = $?.to_i
+ stdout_buf = ""
+ stderr_buf = ""
+ stats = {}
+ Open3.popen3("time -v #{cmd}") do |stdin,stdout,stderr,wait_thr|
+ stderr_buf = stderr.read
+ stdout_buf = stdout.read
+ stats = parse_stats.call(stderr_buf)
+ stdin.close
+ stdout.close
+ stderr.close
+ err = wait_thr.value
end
- err
+ $stderr.print(stderr_buf) if options[:debug]
+ if err and err != 0
+ $stderr.print(stdout_buf)
+ $stderr.print(stderr_buf) if not options[:debug]
+ $stderr.print "FATAL ERROR: gemma-wrapper bailed out with #{err}\n"
+ # sleep 10_000
+ $stderr.print Kernel.caller().join("\n")
+ exit 1
+ end
+ return err,stats
}
compute_hash = lambda do | phenofn = nil |
# Compute a HASH on the inputs
error.call "Hash is empty" if hashme == nil or hashme == []
@@ -317,10 +402,11 @@
else
hashme
end
debug.call("Hashing on ",hm)
hm.each do | item |
+ # if entry is a file use the hash of its content, otherwise just the entry itself
if File.file?(item)
hashes << Digest::SHA1.hexdigest(File.read(item))
debug.call [item,hashes.last]
else
hashes << item
@@ -341,19 +427,20 @@
else
GEMMA_ARGS_HASH
end
HASH = compute_hash.call()
+options[:compute_hash_on] = hashme
options[:hash] = HASH
at_exit do
Lock.release(HASH)
end
Lock.create(HASH) # this will wait for a lock to expire
-joblog = options[:cache_dir]+"/"+HASH+"-parallel.log"
+JOBLOG = HASH+"-parallel.log"
# Create cache dir
FileUtils::mkdir_p options[:cache_dir]
Dir.mktmpdir do |tmpdir| # tmpdir for GEMMA output
@@ -363,11 +450,11 @@
GEMMA_ARGS = gemma_args
debug.call "Options: ",options,"\n" if !options[:quiet]
invoke_gemma = lambda do |extra_args, cache_hit = false, chr = "full", permutation = 1|
- cmd = "#{gemma_command2} #{GEMMA_ARGS.join(' ')} #{extra_args.join(' ')}"
+ cmd = "time -v #{gemma_command2} #{extra_args.join(' ')} #{GEMMA_ARGS.join(' ')}"
record[:gemma_command] = cmd
return if cache_hit
if options[:slurm]
info.call cmd
hashi = HASH
@@ -393,11 +480,11 @@
info.call("Would have invoked: ",cmd)
elsif options[:parallel]
info.call("Add parallel job: ",cmd)
parallel_cmds << cmd
else
- err = execute.call(cmd)
+ err,stats = execute.call(cmd)
end
err
else
if options[:dry_run]
info.call("Would have invoked ",cmd)
@@ -414,10 +501,12 @@
json_out.call
raise "exit on GEMMA error #{errno}"
end
end
+create_archive = false
+
# Takes the hash value and checks whether the (output) file exists
# returns datafn, logfn, cache_hit
cache = lambda do | chr, ext, h=HASH, permutation=0 |
inject = (chr==nil ? "" : ".#{chr}" )+ext
hashi = (chr==nil ? h : h+inject)
@@ -425,14 +514,19 @@
# for chr 3 and permutation 1 forms something like
# /tmp/1b700-a996f.3.cXX.txt.1.log.txt
logfn = prefix+".log.txt"
datafn = prefix+ext
record[:files] ||= []
- record[:files].push [chr,logfn,datafn]
+ log_basefn = File.basename(logfn)
+ data_basefn = File.basename(datafn)
+ log_tmpfn = tmpdir+"/"+log_basefn
+ data_tmpfn = tmpdir+"/"+data_basefn
+ record[:files].push [chr,log_basefn,data_basefn]
if !options[:force]
- if File.exist? logfn and File.exist? datafn
- if File.read(logfn).include? "total computation time"
+ info.call "Checking for #{data_tmpfn}"
+ if File.exist? log_tmpfn and File.exist? data_tmpfn
+ if File.read(log_tmpfn).include? "total computation time"
record[:cache_hit] = true
info.call "#{logfn} CACHE HIT!\n"
return hashi, true
end
end
@@ -446,12 +540,14 @@
ext = case (GEMMA_ARGS[GEMMA_ARGS.index('-gk')+1]).to_i
when 0,1 then '.cXX.txt'
when 2 then '.sXX.txt'
else error.call "Unknown kinship type"
end
+ # ---- check cache:
hashi, cache_hit = cache.call chr,ext
if not cache_hit
+ create_archive = true
if chr != nil
invoke_gemma.call [ '-loco', chr, '-o', hashi ], cache_hit
else
invoke_gemma.call [ '-o', HASH ], cache_hit
end
@@ -464,12 +560,14 @@
error.call "Do not use the GEMMA -k switch with gemma-wrapper - it is automatic!" if GEMMA_ARGS.include? '-k' # K is automatic
# Update hash for each permutation
hash = compute_hash.call(pfn)
hashi, cache_hit = cache.call(chr,".assoc.txt",hash,permutation)
if not cache_hit
- args = [ '-k', kfn, '-o', hashi ]
+ create_archive = true
+ args = []
args << [ '-loco', chr ] if chr != nil
+ args << [ '-k', kfn, '-o', hashi ]
args << [ '-p', pfn ] if pfn
invoke_gemma.call args,false,chr,permutation
end
end
@@ -478,23 +576,36 @@
if options[:chromosomes]
CHROMOSOMES = options[:chromosomes]
end
end
+json_in = nil
+
if DO_COMPUTE_KINSHIP
# compute K
+ ARCHIVE = options[:cache_dir]+"/"+HASH+"-gemma-cXX.tar.xz"
+
+ if File.exist? ARCHIVE and not options[:force]
+ info.call "Unpack archive #{ARCHIVE}!"
+ execute.call "tar xJf #{ARCHIVE} -C #{tmpdir}"
+ end
info.call CHROMOSOMES
if LOCO
CHROMOSOMES.each do |chr|
- info.call "LOCO for ",chr
+ info.call "Compute kinship LOCO for chr ",chr
kinship.call(chr)
end
else
kinship.call # no LOCO
end
else
# DO_COMPUTE_GWA
+ ARCHIVE = options[:cache_dir]+"/"+HASH+"-gemma-GWA.tar.xz"
+ if File.exist? ARCHIVE and not options[:force]
+ info.call "Unpack archive #{ARCHIVE}!"
+ execute.call "env XZ_OPT='-T0' tar xJf #{ARCHIVE} -C #{tmpdir}"
+ end
begin
json_in = JSON.parse(File.read(options[:input]))
rescue TypeError
raise "Missing JSON input file?"
end
@@ -502,16 +613,24 @@
pfn = options[:permute_phenotypes] # can be nil
if LOCO
k_files = json_in["files"].map { |rec| [rec[0],rec[2]] }
k_files.each do | chr, kfn | # call a GWA for each chromosome
- gwas.call(chr,kfn,pfn)
+
+ kfn2 = options[:cache_dir]+"/"+kfn
+ if not File.exist?(kfn2) and json_in["archive"]
+ # we aim to unpack the archive once on reuse
+ archive_grm = options[:cache_dir]+"/"+json_in["archive"]
+ execute.call "env XZ_OPT='-T0' tar xJf #{archive_grm} -C #{options[:cache_dir]}"
+ end
+
+ gwas.call(chr,kfn2,pfn)
end
else
kfn = json_in["files"][0][2]
CHROMOSOMES.each do | chr |
- gwas.call(chr,kfn,pfn)
+ gwas.call(chr,tmpdir+"/"+kfn,pfn)
end
end
# Permute
if options[:permutate]
ps = []
@@ -560,49 +679,107 @@
end
# ---- Invoke parallel
if options[:parallel]
# parallel_cmds = ["echo 1","sleep 1 && echo 2", "false", "echo 3"]
+ joblog = tmpdir+"/"+JOBLOG
Tempfile.open("commands.txt") do |f|
cmdfn = f.path
File.open(cmdfn,"w") do |f|
parallel_cmds.each do |c|
f.puts(c)
end
end
cmd = "cat \"#{cmdfn}\""
- err = execute.call(cmd+"|parallel --joblog #{joblog}") # first try optimistically to run all jobs in parallel
+ debug.call("tmpdir=#{tmpdir}")
+ err,stats = execute.call(cmd+"|parallel --results #{tmpdir} --joblog #{joblog}") # first try optimistically to run all jobs in parallel
if err != 0
- [16,8,4,1].each do |jobs|
+ [4,1].each do |jobs|
info.call("Failed to complete parallel run -- retrying with smaller RAM footprint!")
- err = execute.call(cmd+"|parallel -j #{jobs} --resume --joblog #{joblog}")
+ err,stats = execute.call(cmd+"|parallel -j #{jobs} --results #{tmpdir} --resume --joblog #{joblog}")
break if err == 0
end
if err != 0
info.call("Parallel run failed!")
debug.call("Job log is: ",File.read(joblog))
- # Remove remaining files
- FileUtils.mv joblog, joblog+".bak", verbose: false, force: true
- FileUtils.rm_rf("#{tmpdir}/*", secure: true)
exit err
end
end
end
info.call("Run successful!")
- FileUtils.mv joblog, joblog+".bak", verbose: false, force: true
end
-json_out.call
-# copy all output files to the cache_dir. If a file exists only emit a warning
-Dir.glob("*.txt", base: tmpdir) do | fn |
- source = tmpdir + "/" + fn
- dest = options[:cache_dir] + "/" + fn
- if not File.exist?(dest) or options[:force]
- info.call "Move #{source} to #{dest}"
- FileUtils.mv source, dest, verbose: false
- else
- warning.call "File #{dest} already exists. Not overwriting"
+# Collect stats from parallel run
+
+run_stats = {}
+$stderr.print "STATS"
+Dir.glob(tmpdir+'/*/*' ).each do | dir |
+ File.open("#{dir}/stderr") { |f|
+ run_stat = parse_stats.call(f.read)
+ chr = run_stat[:chr]
+ run_stats[chr] = run_stat
+ }
+end
+# Now add up the stats
+user_time = 0.0
+system_time = 0.0
+wall_clock = "0"
+ram_usage_gb = 0.0
+run_stats.each do | k, v |
+ wall_clock=v[:wall_clock] if v[:wall_clock]>wall_clock
+ ram_usage_gb += v[:ram_usage_gb]
+ user_time += v[:user_time]
+ system_time += v[:system_time]
+end
+
+record[:user_time] = user_time
+record[:system_time] = system_time
+record[:wall_clock] = wall_clock
+record[:ram_usage_gb] = ram_usage_gb.round(2)
+record[:run_stats] = run_stats
+
+if create_archive
+ if DO_COMPUTE_GWA
+ LMDB = tmpdir+"/"+HASH+'.mdb'
+ # create lmdb database - we call out into a python script for that.
+ # first create a JSON record
+
+ meta = {
+ type: "gemma-wrapper",
+ version: version,
+ population: options[:population],
+ name: options[:name],
+ trait: options[:trait],
+ url: "https://genenetwork.org/show_trait?trait_id="+options[:trait]+"&dataset="+options[:name],
+ archive_GRM: json_in["archive"],
+ archive_GWA: File.basename(ARCHIVE),
+ }
+ if options[:id] and options[:id] =~ /,/ # this is GN specific
+ dataid,probesetid,probesetfreezeid = options[:id].split(",")
+ meta[:dataid] = dataid.to_i
+ meta[:probesetid] = probesetid.to_i
+ meta[:probesetfreezeid] = probesetfreezeid.to_i
+ end
+ record[:meta] = meta
+ metafn = tmpdir+"/"+HASH+"-meta.json"
+ File.write(metafn,record.to_json)
+ # sleep 10_000
+ if options[:lmdb]
+ File.unlink(LMDB) if File.exist?(LMDB) # removed any cached lmdb
+ execute.call "python3 #{BIN}/gemma2lmdb.py --db=#{LMDB} --meta=#{metafn} #{tmpdir}/*assoc.txt"
+ end
+ if not options[:keep]
+ execute.call "rm -f #{tmpdir}/1/*/* #{tmpdir}/*.txt #{tmpdir}/*.log #{tmpdir}/*.mdb-lock" # remove GEMMA output files
+ FileUtils.rm_rf("#{tmpdir}/1", secure: true)
+ end
end
+ File.write(tmpdir+"/"+HASH+"-gemma-wrapper-output.json",json_out.call)
+ info.call "Creating archive #{ARCHIVE}..."
+ execute.call "env XZ_OPT='-T0' tar -cvJf #{ARCHIVE} -C #{tmpdir} ."
end
end # tmpdir
+
+record[:archive] = File.basename(ARCHIVE)
+
+print json_out.call