#! /usr/bin/env ruby # # gemma-wrapper # Author:: Pjotr Prins # License:: GPL3 # # Copyright (C) 2017-2021 Pjotr Prins USAGE = " GEMMA wrapper example: Simple caching of K computation with gemma-wrapper -- \\ -g test/data/input/BXD_geno.txt.gz \\ -p test/data/input/BXD_pheno.txt \\ -a test/data/input/BXD_snps.txt \ -gk LOCO K computation with caching and JSON output gemma-wrapper --json --loco -- \\ -g test/data/input/BXD_geno.txt.gz \\ -p test/data/input/BXD_pheno.txt \\ -a test/data/input/BXD_snps.txt \\ -gk -debug > K.json LMM's using the K's captured in K.json using the --input switch gemma-wrapper --json --loco --input K.json -- \\ -g test/data/input/BXD_geno.txt.gz \\ -p test/data/input/BXD_pheno.txt \\ -c test/data/input/BXD_covariates2.txt \\ -a test/data/input/BXD_snps.txt \\ -lmm 2 -maf 0.1 \\ -debug > GWA.json Gemma gets used from the path. You can override by setting env GEMMA_COMMAND=path/bin/gemma gemma-wrapper ... " # These are used for testing compatibility with the gemma tool GEMMA_V_MAJOR = 98 GEMMA_V_MINOR = 4 basepath = File.dirname(File.dirname(__FILE__)) $: << File.join(basepath,'lib') VERSION_FILENAME=File.join(basepath,'VERSION') version = File.new(VERSION_FILENAME).read.chomp # ---- GEMMA path gemma_command = ENV['GEMMA_COMMAND'] # Look for gemma if not gemma_command ENV['PATH'].split(':').each do | path | try_bin = path + '/' + 'gemma' if File.executable?(try_bin) gemma_command = try_bin break end end end require 'digest/sha1' require 'fileutils' require 'optparse' require 'tempfile' require 'tmpdir' require 'lock' split_at = ARGV.index('--') if split_at gemma_args = ARGV[split_at+1..-1] end options = { show_help: false, source: 'https://github.com/genetics-statistics/gemma-wrapper', version: version+' (Pjotr Prins)', date: Time.now.to_s, gemma_command: gemma_command, cache_dir: Dir.tmpdir(), quiet: false, permute_phenotypes: false, parallel: nil } opts = OptionParser.new do |o| o.banner = "\nUsage: #{File.basename($0)} [options] -- [gemma-options]" o.on('--permutate n', Integer, 'Permutate # times by shuffling phenotypes') do |lst| options[:permutate] = lst options[:force] = true end o.on('--permute-phenotypes filen',String, 'Phenotypes to be shuffled in permutations') do |phenotypes| options[:permute_phenotypes] = phenotypes raise "Phenotype input file #{phenotypes} does not exist" if !File.exist?(phenotypes) end o.on('--loco', 'Run full leave-one-chromosome-out (LOCO)') do |b| options[:loco] = b end o.on('--chromosomes [1,2,3]',Array,'Run specific chromosomes') do |lst| options[:chromosomes] = lst end o.on('--input filen',String, 'JSON input variables (used for LOCO)') do |filen| options[:input] = filen raise "JSON input file #{filen} does not exist" if !File.exist?(filen) end o.on('--cache-dir path',String, 'Use a cache directory') do |path| options[:cache_dir] = path end o.on('--json', 'Create output file in JSON format') do |b| options[:json] = b end o.on("--force", "Force computation (override cache)") do |q| options[:force] = true end o.on("--parallel", "Run jobs in parallel") do |b| options[:parallel] = true end o.on("--no-parallel", "Do not run jobs in parallel") do |b| options[:parallel] = false end o.on("--slurm[=opts]",String,"Use slurm PBS for submitting jobs") do |slurm| options[:slurm_opts] = "" options[:slurm] = true if slurm options[:slurm_opts] = slurm end end o.on("--q", "--quiet", "Run quietly") do |q| options[:quiet] = true end o.on("-v", "--verbose", "Run verbosely") do |v| options[:verbose] = true end o.on("-d", "--debug", "Show debug messages and keep intermediate output") do |v| options[:debug] = true end o.on("--dry-run", "Show commands, but don't execute") do |b| options[:dry_run] = b end o.on('--','Anything after gets passed to GEMMA') do o.terminate() end o.separator "" o.on_tail('-h', '--help', 'display this help and exit') do options[:show_help] = true end end opts.parse!(ARGV) # ---- Output handlers OUTPUT = (options[:json] ? $stderr : $stdout ) record = { warnings: [], errno: 0, debug: [] } require 'json' json_out = lambda do print record.to_json if options[:json] end # ---- Some error handlers error = lambda do |*msg| if options[:json] record[:error] = *msg.join(" ") record[:errno] = 1 json_out.call end raise *msg end debug = lambda do |*msg| if options[:debug] record[:debug].push *msg.join("") if options[:json] OUTPUT.print "DEBUG: ",*msg,"\n" end end warning = lambda do |*msg| record[:warnings].push *msg.join("") OUTPUT.print "WARNING: ",*msg,"\n" end info = lambda do |*msg| record[:debug].push *msg.join("") if options[:json] and options[:debug] OUTPUT.print *msg,"\n" if !options[:quiet] end # Fetch chromosomes def get_chromosomes annofn h = {} File.open(annofn,"r").each_line do | line | chr = line.split(/\s+/)[2] h[chr] = true end h.map { |k,v| k } end # ---- Start banner GEMMA_K_VERSION=version GEMMA_K_BANNER = "gemma-wrapper #{version} (Ruby #{RUBY_VERSION}) by Pjotr Prins 2017-2021\n" info.call GEMMA_K_BANNER # Check gemma version begin gemma_command2 = options[:gemma_command] info.call "NOTE: gemma-wrapper is soon to be replaced by gemma2/lib" GEMMA_INFO = `#{gemma_command2}` rescue Errno::ENOENT gemma_command2 = "gemma" error.call "<#{gemma_command2}> command not found" end gemma_version_header = GEMMA_INFO.split("\n").grep(/GEMMA|Version/)[0].strip info.call "Using ",gemma_version_header,"\n" gemma_version = gemma_version_header.split(/[,\s]+/)[1] v_version, v_major, v_minor = gemma_version.split(".") info.call "Found #{gemma_version}, comparing against expected v0.#{GEMMA_V_MAJOR}.#{GEMMA_V_MINOR}" info.call gemma_version_header warning.call "GEMMA version is out of date. Update GEMMA to 0.#{GEMMA_V_MAJOR}.#{GEMMA_V_MINOR}!" if v_major.to_i < GEMMA_V_MAJOR or (v_major.to_i == GEMMA_V_MAJOR and (v_minor != nil and v_minor.to_i < GEMMA_V_MINOR)) options[:gemma_version_header] = gemma_version_header options[:gemma_version] = gemma_version if options[:show_help] or gemma_args == nil print opts print USAGE exit 1 end if RUBY_VERSION =~ /^1/ warning "runs on Ruby 2.x only\n" end # ---- LOCO defaults to parallel if options[:parallel] == nil options[:parallel] = true if options[:loco] end debug.call(options) # some debug output debug.call(record) DO_COMPUTE_KINSHIP = gemma_args.include?("-gk") DO_COMPUTE_GWA = !DO_COMPUTE_KINSHIP if options[:parallel] begin skip_cite = `echo "will cite" |parallel --citation` debug.call(skip_cite) PARALLEL_INFO = `parallel --help` rescue Errno::ENOENT error.call " command not found" end parallel_cmds = [] end # ---- Fetch chromosomes from SNP annotation file anno_idx = gemma_args.index '-a' raise "Expected GEMMA -a genotype file switch" if anno_idx == nil CHROMOSOMES = get_chromosomes(gemma_args[anno_idx+1]) # ---- Compute HASH on inputs hashme = [] geno_idx = gemma_args.index '-g' raise "Expected GEMMA -g genotype file switch" if geno_idx == nil pheno_idx = gemma_args.index '-p' if DO_COMPUTE_GWA and options[:permute_phenotypes] raise "Did not expect GEMMA -p phenotype whith permutations (only use --permutate-phenotypes)" if pheno_idx end execute = lambda { |cmd| info.call("Executing: #{cmd}") err = 0 if not options[:debug] # send output to stderr line by line IO.popen("#{cmd}") do |io| while s = io.gets $stderr.print s end io.close err = $?.to_i end else $stderr.print `#{cmd}` err = $?.to_i end err } compute_hash = lambda do | phenofn = nil | # Compute a HASH on the inputs debug.call "Hashing on ",hashme,"\n" hashes = [] hm = if phenofn hashme + ["-p", phenofn] else hashme end debug.call(hm) hm.each do | item | if File.file?(item) hashes << Digest::SHA1.hexdigest(File.read(item)) debug.call [item,hashes.last] else hashes << item end end debug.call(hashes) Digest::SHA1.hexdigest hashes.join(' ') end HASH = compute_hash.call() options[:hash] = HASH at_exit do Lock.release(HASH) end Lock.create(HASH) # this will wait for a lock to expire joblog = options[:cache_dir]+"/"+HASH+"-parallel.log" # Create cache dir FileUtils::mkdir_p options[:cache_dir] Dir.mktmpdir do |tmpdir| # tmpdir for GEMMA output error.call "Do not use the GEMMA -o switch!" if gemma_args.include? '-o' error.call "Do not use the GEMMA -outdir switch!" if gemma_args.include? '-outdir' GEMMA_ARGS_HASH = gemma_args.dup # do not include outdir gemma_args << '-outdir' gemma_args << tmpdir GEMMA_ARGS = gemma_args hashme = if DO_COMPUTE_KINSHIP and pheno_idx != nil # Remove the phenotype file from the hash for GRM computation GEMMA_ARGS_HASH[0..pheno_idx-1] + GEMMA_ARGS_HASH[pheno_idx+2..-1] else GEMMA_ARGS_HASH end debug.call "Options: ",options,"\n" if !options[:quiet] invoke_gemma = lambda do |extra_args, cache_hit = false, chr = "full", permutation = 1| cmd = "#{gemma_command2} #{GEMMA_ARGS.join(' ')} #{extra_args.join(' ')}" record[:gemma_command] = cmd return if cache_hit if options[:slurm] info.call cmd hashi = HASH prefix = tmpdir+'/'+hashi scriptfn = prefix+".#{chr}.#{permutation}-pbs.sh" script = "#!/bin/bash #SBATCH --job-name=gemma-#{scriptfn} #SBATCH --ntasks=1 #SBATCH --time=20:00 srun #{cmd} " debug.call(script) File.open(scriptfn,"w") { |f| f.write(script) } cmd = "sbatch "+options[:slurm_opts] + scriptfn end errno = if options[:json] # capture output err = 0 if options[:dry_run] info.call("Would have invoked: ",cmd) elsif options[:parallel] info.call("Add parallel job: ",cmd) parallel_cmds << cmd else err = execute.call(cmd) end err else if options[:dry_run] info.call("Would have invoked ",cmd) 0 else debug.call("Invoking ",cmd) if options[:debug] system(cmd) $?.exitstatus end end if errno != 0 debug.call "Gemma exit ",errno record[:errno] = errno json_out.call raise "exit on GEMMA error #{errno}" end end # Takes the hash value and checks whether the (output) file exists # returns datafn, logfn, cache_hit cache = lambda do | chr, ext, h=HASH, permutation=0 | inject = (chr==nil ? "" : ".#{chr}" )+ext hashi = (chr==nil ? h : h+inject) prefix = options[:cache_dir]+'/'+hashi+(permutation!=0 ? "."+permutation.to_s : "") # for chr 3 and permutation 1 forms something like # /tmp/1b700-a996f.3.cXX.txt.1.log.txt logfn = prefix+".log.txt" datafn = prefix+ext record[:files] ||= [] record[:files].push [chr,logfn,datafn] if !options[:force] if File.exist? logfn and File.exist? datafn if File.read(logfn).include? "total computation time" record[:cache_hit] = true info.call "#{logfn} CACHE HIT!\n" return hashi, true end end end return hashi,false end # ---- Compute K kinship = lambda do | chr = nil | record[:type] = "K" ext = case (GEMMA_ARGS[GEMMA_ARGS.index('-gk')+1]).to_i when 0,1 then '.cXX.txt' when 2 then '.sXX.txt' else error.call "Unknown kinship type" end hashi, cache_hit = cache.call chr,ext if not cache_hit if chr != nil invoke_gemma.call [ '-loco', chr, '-o', hashi ], cache_hit else invoke_gemma.call [ '-o', HASH ], cache_hit end end end # ---- Run GWA gwas = lambda do | chr, kfn, pfn, permutation=0 | record[:type] = "GWA" error.call "Do not use the GEMMA -k switch with gemma-wrapper - it is automatic!" if GEMMA_ARGS.include? '-k' # K is automatic # Update hash for each permutation hash = compute_hash.call(pfn) hashi, cache_hit = cache.call(chr,".assoc.txt",hash,permutation) if not cache_hit args = [ '-k', kfn, '-o', hashi ] args << [ '-loco', chr ] if chr != nil args << [ '-p', pfn ] if pfn invoke_gemma.call args,false,chr,permutation end end LOCO = options[:loco] if LOCO if options[:chromosomes] CHROMOSOMES = options[:chromosomes] end end if DO_COMPUTE_KINSHIP # compute K info.call CHROMOSOMES if LOCO CHROMOSOMES.each do |chr| info.call "LOCO for ",chr kinship.call(chr) end else kinship.call # no LOCO end else # DO_COMPUTE_GWA begin json_in = JSON.parse(File.read(options[:input])) rescue TypeError raise "Missing JSON input file?" end raise "JSON problem, file #{options[:input]} is not -gk derived" if json_in["type"] != "K" pfn = options[:permute_phenotypes] # can be nil if LOCO k_files = json_in["files"].map { |rec| [rec[0],rec[2]] } k_files.each do | chr, kfn | # call a GWA for each chromosome gwas.call(chr,kfn,pfn) end else kfn = json_in["files"][0][2] CHROMOSOMES.each do | chr | gwas.call(chr,kfn,pfn) end end # Permute if options[:permutate] ps = [] raise "You should supply --permute-phenotypes with gemma-wrapper --permutate" if not pfn File.foreach(pfn).with_index do |line, line_num| ps << line end score_list = [] debug.call(options[:permutate],"x permutations") (1..options[:permutate]).each do |permutation| $stderr.print "Iteration ",permutation,"\n" # Create a shuffled phenotype file file = File.open("phenotypes-#{permutation}","w") tmp_pfn = file.path p tmp_pfn ps.shuffle.each do | l | file.print(l) end file.close k_files.each do | chr, kfn | # call a GWA for each chromosome gwas.call(chr,kfn,tmp_pfn,permutation) end score_min = 1000.0 if false and not options[:slurm] # p [:HEY,record[:files].last] assocfn = record[:files].last[2] debug.call("Reading ",assocfn) File.foreach(assocfn).with_index do |assoc, assoc_line_num| if assoc_line_num > 0 value = assoc.strip.split(/\t/).last.to_f score_min = value if value < score_min end end end score_list << score_min end exit 0 if options[:slurm] ls = score_list.sort p ls significant = ls[(ls.size - ls.size*0.95).floor] suggestive = ls[(ls.size - ls.size*0.67).floor] p ["95 percentile (significant) ",significant,(-Math.log10(significant)).round(1)] p ["67 percentile (suggestive) ",suggestive,(-Math.log10(suggestive)).round(1)] exit 0 end end # ---- Invoke parallel if options[:parallel] # parallel_cmds = ["echo 1","sleep 1 && echo 2", "false", "echo 3"] cmd = parallel_cmds.join("\\n") cmd = "echo -e \"#{cmd}\"" err = execute.call(cmd+"|parallel --joblog #{joblog}") # first try optimistically to run all jobs in parallel if err != 0 [16,8,4,1].each do |jobs| info.call("Failed to complete parallel run -- retrying with smaller RAM footprint!") err = execute.call(cmd+"|parallel -j #{jobs} --resume --joblog #{joblog}") break if err == 0 end if err != 0 info.call("Run failed!") # Remove remaining files FileUtils.rm_rf("#{tmpdir}/*", secure: true) exit err end end info.call("Run successful!") end json_out.call # copy all output files to the cache_dir. If a file exists only emit a warning Dir.glob("*.txt", base: tmpdir) do | fn | source = tmpdir + "/" + fn dest = options[:cache_dir] + "/" + fn if not File.exist?(dest) or options[:force] info.call "Move #{source} to #{dest}" FileUtils.mv source, dest, verbose: false else warning.call "File #{dest} already exists. Not overwriting" end end end # tmpdir