#!/usr/bin/env ruby require 'optparse' require 'ostruct' require 'bio-maf' require 'bio-bgzf' $options = OpenStruct.new $options.dir = '.' $options.ref_only = true $options.n_jobs = 1 $options.force = false $options.level = 2 op = OptionParser.new do |opts| opts.banner = "Usage: maf_bgzip [options] [ ...]" opts.separator "" opts.separator "Options:" opts.on("-d", "--dir DIR", "Directory to write compressed MAF to", "(default is current directory)") do |dir| $options.dir = dir end opts.on("-i", "--index", "Index MAF files after writing") do $options.index = true end opts.on("-a", "--all", "Index all sequences, not just reference seq", "(has no effect without --index)") do $options.ref_only = false end opts.on("-l", "--level LEVEL", Integer, "gzip compression level for BGZF (1-9)") do |level| unless 1 <= level && level <= 9 $stderr.puts "Invalid compression level: #{level}" $stderr.puts opts exit 2 end $options.level = level end opts.on("-f", "--force", "Replace output files if they already exist") do $options.force = true end opts.on("-j", "--jobs N", Integer, "Run N concurrent jobs (default 1)") do |n| $options.n_jobs = n end Bio::MAF::handle_logging_options(opts) end op.parse!(ARGV) Bio::Log::CLI.configure('bio-maf') INTERVAL = 10 LOG = Bio::MAF::LOG def make_processing_task(maf) maf_base = File.basename(maf) base = maf_base.gsub(/\.maf.*/, '') bgz_path = "#{$options.dir}/#{base}.maf.bgz" if File.exist?(bgz_path) && ! $options.force LOG.error "#{bgz_path} already exists, refusing to overwrite " \ "without --force!" exit 1 end idx_path = nil if $options.index idx_path = "#{$options.dir}/#{base}.kct" if File.exist?(idx_path) && ! $options.force LOG.error "#{idx_path} already exists, refusing to overwrite " \ "without --force!" exit 1 end end lambda { process_maf(maf, bgz_path, idx_path) } end def process_maf(maf_path, bgz_path, idx_path) maf_base = File.basename(maf_path) LOG.debug { "Processing #{maf_base}." } p = Bio::MAF::Parser.new(maf_path, :retain_text => true) if idx_path if File.exists?(idx_path) File.unlink(idx_path) end idx = Bio::MAF::KyotoIndex.new(idx_path) idx.prep(bgz_path, :bgzf, $options.ref_only) exec = Bio::MAF::Executor.create end start_t = Time.now last_t = start_t last_pos = 0 n_blocks = 0 maf_size = File.size(maf_path) File.open(bgz_path, 'wb') do |out_f| Bio::BGZF::Writer.new(out_f, $options.level) do |bgz_w| maf_w = Bio::MAF::Writer.new(bgz_w) maf_w.write_header(p.header) p.each_block do |block| bgz_w.write(block.orig_text) if idx block.offset = bgz_w.last_write_pos exec.submit do idx.index_blocks([block]) end end n_blocks += 1 if n_blocks % 100 == 0 cur_t = Time.now delta_t = cur_t - last_t if delta_t > INTERVAL cur_pos = p.phys_f.tell LOG.debug { pos_mb = cur_pos.to_f / 1048576 delta_bytes = cur_pos - last_pos rate = delta_bytes.to_f / delta_t mb_rate = rate / 1048576 pct = cur_pos.to_f / maf_size * 100 elapsed = cur_t - start_t sprintf("%s: processed %.1f MB (%.1f%%) in %ds, %.2f MB/s.", maf_base, pos_mb, pct, elapsed, mb_rate) } last_t = cur_t last_pos = cur_pos end end end end end unc = p.f.tell if p.f != p.phys_f p.close if idx exec.shutdown idx.db.synchronize(true) end elapsed = Time.now - start_t mb = maf_size.to_f / 1048576 mb_rate = mb / elapsed LOG.info { sprintf("Processed %s (%.1f MB) in %ds, %.2f MB/s", maf_base, mb, elapsed, mb_rate) } if unc LOG.info { unc_mb = unc / 1048576 unc_rate = unc_mb / elapsed sprintf(" Uncompressed: %.1f MB, %.2f MB/s", unc_mb, unc_rate) } end LOG.info { raw_size = unc || maf_size avg_block_kb = raw_size.to_f / n_blocks / 1024 sprintf(" %d alignment blocks, average size %.2f KB", n_blocks, avg_block_kb) } LOG.info { orig_size = unc ? unc : maf_size bgzf_size = File.size(bgz_path).to_f ratio = bgzf_size / orig_size sprintf(" Compressed with BGZF (level=%d) to %.1f MB (%.1fx)", $options.level, bgzf_size / 1048576, ratio) } end runner = Bio::MAF::JobRunner.create($options.n_jobs) LOG.debug "Created #{runner.class} set for #{$options.n_jobs} concurrent jobs." ARGV.each do |maf| task = make_processing_task(maf) runner.add(&task) end LOG.debug "Running jobs." runner.run LOG.debug "Finished processing."