#! /usr/bin/env ruby # # gemma-wrapper # Author:: Pjotr Prins # License:: GPL3 # # Copyright (C) 2017-2024 Pjotr Prins USAGE = " GEMMA wrapper example: Simple caching of K computation with gemma-wrapper -- \ -g test/data/input/BXD_geno.txt.gz \ -p test/data/input/BXD_pheno.txt \ -a test/data/input/BXD_snps.txt \ -gk > K.json LOCO K computation with caching and JSON output gemma-wrapper --json --loco -- \ -g test/data/input/BXD_geno.txt.gz \ -p test/data/input/BXD_pheno.txt \ -a test/data/input/BXD_snps.txt \ -gk -debug > K.json LMM's using the K's captured in K.json using the --input switch gemma-wrapper --json --loco --input K.json -- \ -g test/data/input/BXD_geno.txt.gz \ -p test/data/input/BXD_pheno.txt \ -c test/data/input/BXD_covariates2.txt \ -a test/data/input/BXD_snps.txt \ -lmm 9 -maf 0.1 \ -debug > GWA.json Gemma gets used from the path. You can override by setting env GEMMA_COMMAND=path/bin/gemma gemma-wrapper ... " # These are used for testing compatibility with the gemma tool GEMMA_V_MAJOR = 98 GEMMA_V_MINOR = 4 basepath = File.dirname(File.dirname(__FILE__)) $: << File.join(basepath,'lib') BIN = File.join(basepath,'bin') VERSION_FILENAME=File.join(basepath,'VERSION') version = File.new(VERSION_FILENAME).read.chomp # ---- GEMMA path gemma_command = ENV['GEMMA_COMMAND'] # Look for gemma if not gemma_command ENV['PATH'].split(':').each do | path | try_bin = path + '/' + 'gemma' if File.executable?(try_bin) gemma_command = try_bin break end end end hashme = nil require 'digest/sha1' require 'fileutils' require 'optparse' require 'open3' require 'socket' # for hostname require 'tempfile' require 'time' require 'tmpdir' require 'lock' split_at = ARGV.index('--') if split_at gemma_args = ARGV[split_at+1..-1] end options = { show_help: false, source: 'https://github.com/genetics-statistics/gemma-wrapper', version: version+' (Pjotr Prins)', date: Time.now.to_s, gemma_command: gemma_command, cache_dir: Dir.tmpdir(), quiet: false, permute_phenotypes: false, lmdb: nil, parallel: nil } opts = OptionParser.new do |o| o.banner = "\nUsage: #{File.basename($0)} [options] -- [gemma-options]" o.on('--permutate n', Integer, 'Permutate # times by shuffling phenotypes') do |lst| options[:permutate] = lst options[:force] = true end o.on('--permute-phenotypes filen',String, 'Phenotypes to be shuffled in permutations') do |phenotypes| options[:permute_phenotypes] = phenotypes raise "Phenotype input file #{phenotypes} does not exist" if !File.exist?(phenotypes) end o.on('--loco', 'Run full leave-one-chromosome-out (LOCO)') do |b| options[:loco] = b end o.on('--population NAME', 'Add population identifier to metadata') do |n| options[:population] = n end o.on('--name NAME', 'Add dataset identifier to metadata') do |n| options[:name] = n end o.on('--id ID', 'Add identifier to metadata') do |n| options[:id] = n end o.on('--trait TRAIT', 'Add trait identifier to metadata') do |n| options[:trait] = n end o.on('--chromosomes [1,2,3]',Array,'Run specific chromosomes') do |lst| options[:chromosomes] = lst end o.on('--input filen',String, 'JSON input variables (used for LOCO)') do |filen| options[:input] = filen raise "JSON input file #{filen} does not exist" if !File.exist?(filen) end o.on('--cache-dir path',String, 'Use a cache directory') do |path| options[:cache_dir] = path end o.on('--json', 'Create output file in JSON format') do |b| options[:json] = b end o.on("--force", "Force computation (override cache)") do |q| options[:force] = true end o.on("--keep", "Keep intermediate files in output") do |q| options[:keep] = true end o.on("--parallel", "Run jobs in parallel") do |b| options[:parallel] = true end o.on("--no-parallel", "Do not run jobs in parallel") do |b| options[:parallel] = false end o.on("--lmdb", "Generate lmdb output") do |b| options[:lmdb] = true end o.on("--slurm[=opts]",String,"Use slurm PBS for submitting jobs") do |slurm| options[:slurm_opts] = "" options[:slurm] = true if slurm options[:slurm_opts] = slurm end end o.on("--q", "--quiet", "Run quietly") do |q| options[:quiet] = true end o.on("-v", "--verbose", "Run verbosely") do |v| options[:verbose] = true end o.on("-d", "--debug", "Show debug messages and keep intermediate output") do |v| options[:debug] = true end o.on("--dry-run", "Show commands, but don't execute") do |b| options[:dry_run] = b end o.on('--','Anything after gets passed to GEMMA') do o.terminate() end o.separator "" o.on_tail('-h', '--help', 'display this help and exit') do options[:show_help] = true end end opts.parse!(ARGV) # ---- Output handlers OUTPUT = (options[:json] ? $stderr : $stdout ) record = { warnings: [], errno: 0, debug: [] } record[:name] = options[:name] if options[:name] record[:id] = options[:id] if options[:id] record[:trait] = options[:trait] if options[:trait] d = DateTime.now record[:time] = d.strftime("%Y/%m/%d %H:%M") record[:user] = ENV["USER"] record[:hostname] = Socket.gethostname require 'json' json_out = lambda do record.to_json if options[:json] end # ---- Some error handlers error = lambda do |*msg| if options[:json] record[:error] = *msg.join(" ") record[:errno] = 1 json_out.call end raise *msg end debug = lambda do |*msg| if options[:debug] record[:debug].push *msg.join("") if options[:json] OUTPUT.print "DEBUG: ",*msg,"\n" end end warning = lambda do |*msg| record[:warnings].push *msg.join("") OUTPUT.print "WARNING: ",*msg,"\n" end info = lambda do |*msg| record[:debug].push *msg.join("") if options[:json] and options[:debug] OUTPUT.print *msg,"\n" if !options[:quiet] end # Fetch chromosomes def get_chromosomes annofn h = {} File.open(annofn,"r").each_line do | line | chr = line.split(/\s+/)[2] h[chr] = true end h.map { |k,v| k } end # ---- Start banner GEMMA_K_VERSION=version GEMMA_K_BANNER = "gemma-wrapper #{version} (Ruby #{RUBY_VERSION}) by Pjotr Prins 2017-2024\n" info.call GEMMA_K_BANNER # Check gemma version begin gemma_command2 = options[:gemma_command] # info.call "NOTE: gemma-wrapper is soon to be replaced" debug.call("Invoke #{gemma_command2}") GEMMA_INFO = `#{gemma_command2}` rescue Errno::ENOENT gemma_command2 = "gemma" if not gemma_command2 error.call "<#{gemma_command2}> command not found" end gemma_version_header = GEMMA_INFO.split("\n").grep(/GEMMA|Version/)[0].strip info.call "Using ",gemma_version_header,"\n" gemma_version = gemma_version_header.split(/[,\s]+/)[1] v_version, v_major, v_minor = gemma_version.split(".") info.call "Found #{gemma_version}, comparing against expected v0.#{GEMMA_V_MAJOR}.#{GEMMA_V_MINOR}" info.call gemma_version_header warning.call "GEMMA version is out of date. Update GEMMA to 0.#{GEMMA_V_MAJOR}.#{GEMMA_V_MINOR}!" if v_major.to_i < GEMMA_V_MAJOR or (v_major.to_i == GEMMA_V_MAJOR and (v_minor != nil and v_minor.to_i < GEMMA_V_MINOR)) options[:gemma_version_header] = gemma_version_header options[:gemma_version] = gemma_version if options[:show_help] or gemma_args == nil print opts print USAGE exit 1 end if RUBY_VERSION =~ /^1/ warning "does not run on Ruby 1.x\n" end # ---- LOCO defaults to parallel if options[:parallel] == nil options[:parallel] = true if options[:loco] end debug.call(options) # some debug output debug.call(record) DO_COMPUTE_KINSHIP = gemma_args.include?("-gk") DO_COMPUTE_GWA = !DO_COMPUTE_KINSHIP if options[:parallel] begin skip_cite = `echo "will cite" |parallel --citation` debug.call(skip_cite) PARALLEL_INFO = `parallel --help` rescue Errno::ENOENT error.call " command not found" end parallel_cmds = [] if not options[:json] error.call " needs --json switch" end end # ---- Fetch chromosomes from SNP annotation file anno_idx = gemma_args.index '-a' raise "Expected GEMMA -a genotype file switch" if anno_idx == nil CHROMOSOMES = get_chromosomes(gemma_args[anno_idx+1]) # ---- Compute HASH on inputs geno_idx = gemma_args.index '-g' raise "Expected GEMMA -g genotype file switch" if geno_idx == nil pheno_idx = gemma_args.index '-p' if DO_COMPUTE_GWA and options[:permute_phenotypes] raise "Did not expect GEMMA -p phenotype whith permutations (only use --permutate-phenotypes)" if pheno_idx end matches = { chr: [:string, /-loco (\S+) /], user_time: [:float, /User time \(seconds\): ([\d\.]+)/], system_time: [:float, /System time \(seconds\): ([\d\.]+)/], perc_cpu: [:int, /Percent of CPU this job got: (\d+)%/], wall_clock: [:string, /Elapsed \(wall clock\) time \(h:mm:ss or m:ss\): (\S+)/], ram_usage_gb: [:gb, /Maximum resident set size \(kbytes\): (\d+)/], command: [:string, /Command being timed: (.+)/] } parse_stats = lambda { |buf| stats = {} buf.split("\\n").each do |s| if s =~ /^\t/ matches.each do |k,v| type,m = v if s =~ m # $stderr.print $1,s stats[k] = case type when :float $1.to_f when :int $1.to_i when :gb (($1.to_f)/1048576.0).round(3) else $1 end end end end end stats } run_stat = {} execute = lambda { |cmd| info.call("Executing: #{cmd}") err = 0 stdout_buf = "" stderr_buf = "" stats = {} Open3.popen3("time -v #{cmd}") do |stdin,stdout,stderr,wait_thr| stderr_buf = stderr.read stdout_buf = stdout.read stats = parse_stats.call(stderr_buf) stdin.close stdout.close stderr.close err = wait_thr.value end $stderr.print(stderr_buf) if options[:debug] if err and err != 0 $stderr.print(stdout_buf) $stderr.print(stderr_buf) if not options[:debug] $stderr.print "FATAL ERROR: gemma-wrapper bailed out with #{err}\n" # sleep 10_000 $stderr.print Kernel.caller().join("\n") exit 1 end return err,stats } compute_hash = lambda do | phenofn = nil | # Compute a HASH on the inputs error.call "Hash is empty" if hashme == nil or hashme == [] debug.call "Hashing on ",hashme," before phenofn inject" hashes = [] hm = if phenofn hashme + ["-p", phenofn] else hashme end debug.call("Hashing on ",hm) hm.each do | item | # if entry is a file use the hash of its content, otherwise just the entry itself if File.file?(item) hashes << Digest::SHA1.hexdigest(File.read(item)) debug.call [item,hashes.last] else hashes << item end end debug.call(hashes) Digest::SHA1.hexdigest hashes.join(' ') end error.call "Do not use the GEMMA -o switch!" if gemma_args.include? '-o' error.call "Do not use the GEMMA -outdir switch!" if gemma_args.include? '-outdir' GEMMA_ARGS_HASH = gemma_args.dup # do not include outdir hashme = if DO_COMPUTE_KINSHIP and pheno_idx != nil # Remove the phenotype file from the hash for GRM computation GEMMA_ARGS_HASH[0..pheno_idx-1] + GEMMA_ARGS_HASH[pheno_idx+2..-1] else GEMMA_ARGS_HASH end HASH = compute_hash.call() options[:compute_hash_on] = hashme options[:hash] = HASH at_exit do Lock.release(HASH) end Lock.create(HASH) # this will wait for a lock to expire JOBLOG = HASH+"-parallel.log" # Create cache dir FileUtils::mkdir_p options[:cache_dir] Dir.mktmpdir do |tmpdir| # tmpdir for GEMMA output gemma_args << '-outdir' gemma_args << tmpdir GEMMA_ARGS = gemma_args debug.call "Options: ",options,"\n" if !options[:quiet] invoke_gemma = lambda do |extra_args, cache_hit = false, chr = "full", permutation = 1| cmd = "time -v #{gemma_command2} #{extra_args.join(' ')} #{GEMMA_ARGS.join(' ')}" record[:gemma_command] = cmd return if cache_hit if options[:slurm] info.call cmd hashi = HASH prefix = tmpdir+'/'+hashi scriptfn = prefix+".#{chr}.#{permutation}-pbs.sh" script = "#!/bin/bash #SBATCH --job-name=gemma-#{scriptfn} #SBATCH --ntasks=1 #SBATCH --time=20:00 srun #{cmd} " debug.call(script) File.open(scriptfn,"w") { |f| f.write(script) } cmd = "sbatch "+options[:slurm_opts] + scriptfn end errno = if options[:json] # capture output err = 0 if options[:dry_run] info.call("Would have invoked: ",cmd) elsif options[:parallel] info.call("Add parallel job: ",cmd) parallel_cmds << cmd else err,stats = execute.call(cmd) end err else if options[:dry_run] info.call("Would have invoked ",cmd) 0 else debug.call("Invoking ",cmd) if options[:debug] system(cmd) $?.exitstatus end end if errno != 0 debug.call "Gemma exit ",errno record[:errno] = errno json_out.call raise "exit on GEMMA error #{errno}" end end create_archive = false # Takes the hash value and checks whether the (output) file exists # returns datafn, logfn, cache_hit cache = lambda do | chr, ext, h=HASH, permutation=0 | inject = (chr==nil ? "" : ".#{chr}" )+ext hashi = (chr==nil ? h : h+inject) prefix = options[:cache_dir]+'/'+hashi+(permutation!=0 ? "."+permutation.to_s : "") # for chr 3 and permutation 1 forms something like # /tmp/1b700-a996f.3.cXX.txt.1.log.txt logfn = prefix+".log.txt" datafn = prefix+ext record[:files] ||= [] log_basefn = File.basename(logfn) data_basefn = File.basename(datafn) log_tmpfn = tmpdir+"/"+log_basefn data_tmpfn = tmpdir+"/"+data_basefn record[:files].push [chr,log_basefn,data_basefn] if !options[:force] info.call "Checking for #{data_tmpfn}" if File.exist? log_tmpfn and File.exist? data_tmpfn if File.read(log_tmpfn).include? "total computation time" record[:cache_hit] = true info.call "#{logfn} CACHE HIT!\n" return hashi, true end end end return hashi,false end # ---- Compute K kinship = lambda do | chr = nil | record[:type] = "K" ext = case (GEMMA_ARGS[GEMMA_ARGS.index('-gk')+1]).to_i when 0,1 then '.cXX.txt' when 2 then '.sXX.txt' else error.call "Unknown kinship type" end # ---- check cache: hashi, cache_hit = cache.call chr,ext if not cache_hit create_archive = true if chr != nil invoke_gemma.call [ '-loco', chr, '-o', hashi ], cache_hit else invoke_gemma.call [ '-o', HASH ], cache_hit end end end # ---- Run GWA gwas = lambda do | chr, kfn, pfn, permutation=0 | record[:type] = "GWA" error.call "Do not use the GEMMA -k switch with gemma-wrapper - it is automatic!" if GEMMA_ARGS.include? '-k' # K is automatic # Update hash for each permutation hash = compute_hash.call(pfn) hashi, cache_hit = cache.call(chr,".assoc.txt",hash,permutation) if not cache_hit create_archive = true args = [] args << [ '-loco', chr ] if chr != nil args << [ '-k', kfn, '-o', hashi ] args << [ '-p', pfn ] if pfn invoke_gemma.call args,false,chr,permutation end end LOCO = options[:loco] if LOCO if options[:chromosomes] CHROMOSOMES = options[:chromosomes] end end json_in = nil if DO_COMPUTE_KINSHIP # compute K ARCHIVE = options[:cache_dir]+"/"+HASH+"-gemma-cXX.tar.xz" if File.exist? ARCHIVE and not options[:force] info.call "Unpack archive #{ARCHIVE}!" execute.call "tar xJf #{ARCHIVE} -C #{tmpdir}" end info.call CHROMOSOMES if LOCO CHROMOSOMES.each do |chr| info.call "Compute kinship LOCO for chr ",chr kinship.call(chr) end else kinship.call # no LOCO end else # DO_COMPUTE_GWA ARCHIVE = options[:cache_dir]+"/"+HASH+"-gemma-GWA.tar.xz" if File.exist? ARCHIVE and not options[:force] info.call "Unpack archive #{ARCHIVE}!" execute.call "env XZ_OPT='-T0' tar xJf #{ARCHIVE} -C #{tmpdir}" end begin json_in = JSON.parse(File.read(options[:input])) rescue TypeError raise "Missing JSON input file?" end raise "JSON problem, file #{options[:input]} is not -gk derived" if json_in["type"] != "K" pfn = options[:permute_phenotypes] # can be nil if LOCO k_files = json_in["files"].map { |rec| [rec[0],rec[2]] } k_files.each do | chr, kfn | # call a GWA for each chromosome kfn2 = options[:cache_dir]+"/"+kfn if not File.exist?(kfn2) and json_in["archive"] # we aim to unpack the archive once on reuse archive_grm = options[:cache_dir]+"/"+json_in["archive"] execute.call "env XZ_OPT='-T0' tar xJf #{archive_grm} -C #{options[:cache_dir]}" end gwas.call(chr,kfn2,pfn) end else kfn = json_in["files"][0][2] CHROMOSOMES.each do | chr | gwas.call(chr,tmpdir+"/"+kfn,pfn) end end # Permute if options[:permutate] ps = [] raise "You should supply --permute-phenotypes with gemma-wrapper --permutate" if not pfn File.foreach(pfn).with_index do |line, line_num| ps << line end score_list = [] debug.call(options[:permutate],"x permutations") (1..options[:permutate]).each do |permutation| $stderr.print "Iteration ",permutation,"\n" # Create a shuffled phenotype file file = File.open("phenotypes-#{permutation}","w") tmp_pfn = file.path p tmp_pfn ps.shuffle.each do | l | file.print(l) end file.close k_files.each do | chr, kfn | # call a GWA for each chromosome gwas.call(chr,kfn,tmp_pfn,permutation) end score_min = 1000.0 if false and not options[:slurm] # p [:HEY,record[:files].last] assocfn = record[:files].last[2] debug.call("Reading ",assocfn) File.foreach(assocfn).with_index do |assoc, assoc_line_num| if assoc_line_num > 0 value = assoc.strip.split(/\t/).last.to_f score_min = value if value < score_min end end end score_list << score_min end exit 0 if options[:slurm] ls = score_list.sort p ls significant = ls[(ls.size - ls.size*0.95).floor] suggestive = ls[(ls.size - ls.size*0.67).floor] p ["95 percentile (significant) ",significant,(-Math.log10(significant)).round(1)] p ["67 percentile (suggestive) ",suggestive,(-Math.log10(suggestive)).round(1)] exit 0 end end # ---- Invoke parallel if options[:parallel] # parallel_cmds = ["echo 1","sleep 1 && echo 2", "false", "echo 3"] joblog = tmpdir+"/"+JOBLOG Tempfile.open("commands.txt") do |f| cmdfn = f.path File.open(cmdfn,"w") do |f| parallel_cmds.each do |c| f.puts(c) end end cmd = "cat \"#{cmdfn}\"" debug.call("tmpdir=#{tmpdir}") err,stats = execute.call(cmd+"|parallel --results #{tmpdir} --joblog #{joblog}") # first try optimistically to run all jobs in parallel if err != 0 [4,1].each do |jobs| info.call("Failed to complete parallel run -- retrying with smaller RAM footprint!") err,stats = execute.call(cmd+"|parallel -j #{jobs} --results #{tmpdir} --resume --joblog #{joblog}") break if err == 0 end if err != 0 info.call("Parallel run failed!") debug.call("Job log is: ",File.read(joblog)) exit err end end end info.call("Run successful!") end # Collect stats from parallel run run_stats = {} $stderr.print "STATS" Dir.glob(tmpdir+'/*/*' ).each do | dir | File.open("#{dir}/stderr") { |f| run_stat = parse_stats.call(f.read) chr = run_stat[:chr] run_stats[chr] = run_stat } end # Now add up the stats user_time = 0.0 system_time = 0.0 wall_clock = "0" ram_usage_gb = 0.0 run_stats.each do | k, v | wall_clock=v[:wall_clock] if v[:wall_clock]>wall_clock ram_usage_gb += v[:ram_usage_gb] user_time += v[:user_time] system_time += v[:system_time] end record[:user_time] = user_time record[:system_time] = system_time record[:wall_clock] = wall_clock record[:ram_usage_gb] = ram_usage_gb.round(2) record[:run_stats] = run_stats if create_archive if DO_COMPUTE_GWA LMDB = tmpdir+"/"+HASH+'.mdb' # create lmdb database - we call out into a python script for that. # first create a JSON record meta = { type: "gemma-wrapper", version: version, population: options[:population], name: options[:name], trait: options[:trait], url: "https://genenetwork.org/show_trait?trait_id="+options[:trait]+"&dataset="+options[:name], archive_GRM: json_in["archive"], archive_GWA: File.basename(ARCHIVE), } if options[:id] and options[:id] =~ /,/ # this is GN specific dataid,probesetid,probesetfreezeid = options[:id].split(",") meta[:dataid] = dataid.to_i meta[:probesetid] = probesetid.to_i meta[:probesetfreezeid] = probesetfreezeid.to_i end record[:meta] = meta metafn = tmpdir+"/"+HASH+"-meta.json" File.write(metafn,record.to_json) # sleep 10_000 if options[:lmdb] File.unlink(LMDB) if File.exist?(LMDB) # removed any cached lmdb execute.call "python3 #{BIN}/gemma2lmdb.py --db=#{LMDB} --meta=#{metafn} #{tmpdir}/*assoc.txt" end if not options[:keep] execute.call "rm -f #{tmpdir}/1/*/* #{tmpdir}/*.txt #{tmpdir}/*.log #{tmpdir}/*.mdb-lock" # remove GEMMA output files FileUtils.rm_rf("#{tmpdir}/1", secure: true) end end File.write(tmpdir+"/"+HASH+"-gemma-wrapper-output.json",json_out.call) info.call "Creating archive #{ARCHIVE}..." execute.call "env XZ_OPT='-T0' tar -cvJf #{ARCHIVE} -C #{tmpdir} ." end end # tmpdir record[:archive] = File.basename(ARCHIVE) print json_out.call