require 'types' require 'scbi_fasta' require 'scbi_blast' require 'sequence' require 'exonerate_result' require 'fln_stats' require 'reptrans' include FlnStats class MyWorkerManagerFln < ScbiMapreduce::WorkManager ############################################################################################# # MANAGER INITIALIZATION ############################################################################################# attr_accessor :seqs_annotation_prot, :seqs_some_coding, :seqs_unknown # open files and prepare global data def self.init_work_manager(options) @@stats_hash = initialize_stats_hash @@stats_taxonomy = {} @@stats_different_prot_id = [] @@stats_different_prot_id_complete_seqs = [] @@options = options $verbose = options[:verbose] input_file = options[:fasta] if !File.exists?('fln_results') Dir.mkdir('fln_results') end @@func_annot_type = { :go_id => 5, :go_description => 6, :kegg_id => 7, :interpro_id => 8, :interpro_description => 9, :ec_id => 10, :pfam_id => 11, :pfam_desc => 12, :unipathway_id => 13 } @@functional_annotations = {} @@functional_annotations.merge!(load_functional_annotations(File.join(ENV['BLASTDB'], 'sp_'+options[:tax_group],'sp_'+options[:tax_group]+'.index'))) #@@functional_annotations.merge!(load_functional_annotations(File.join(ENV['BLASTDB'], 'tr_'+options[:tax_group],'tr_'+options[:tax_group]+'.index'))) if options[:acess_db].include?('t') @@fasta_file = FastaQualFile.new(input_file,'') file_head = "Query_id\tfasta_length\tSubject_id\tdb_name\tStatus\te_value\tp_ident\ts_length\tprotein_length\tWarning_msgs\tframe\tORF_start\tORF_end\ts_start\ts_end\tDescription\tgo_id\tgo_description\tkegg_id\tinterpro_id\tinterpro_description\tec_id\tpfam_id\tpfam_description\tunipathway_id" @@output_files = {} # Seq annotation files if !options[:chimera].nil? @@output_files[CHIMERA] = File.open("fln_results/chimeric_sequences.txt", 'w') @@output_files[CHIMERA].puts file_head elsif File.exists?("fln_results/chimeric_sequences.txt") File.delete("fln_results/chimeric_sequences.txt") end @@output_files[OTHER] = File.open('fln_results/artifact_other.txt', 'w') @@output_files[MISASSEMBLED] = File.open('fln_results/misassembled.txt', 'w') @@output_files[UNKNOWN] = File.open('fln_results/unknown.txt', 'w') @@output_files['db'] = File.open('fln_results/pt_seqs', 'w') @@output_files[CODING] = File.open('fln_results/new_coding.txt', 'w') @@output_files[NCRNA] = File.open('fln_results/nc_rnas.txt', 'w') # Complementary files @@output_files['align'] = File.open('fln_results/alignments.txt', 'w') @@output_files['prot'] = File.open('fln_results/proteins.fasta', 'w') # FASTA @@output_files['nts'] = File.open("fln_results/nt_seq.txt", 'w') @@output_files['seqs'] = File.open('fln_results/unigenes.fasta', 'w') # FASTA @@output_files['stats_html'] = File.open('fln_results/summary_stats.html', 'w') @@output_files['stats_txt'] = File.open('fln_results/summary_stats.txt', 'w') @@output_files[CODING].puts file_head @@output_files['db'].puts file_head @@output_files[NCRNA].puts file_head #RepTrans module @@seqs_annotation_prot = [] @@seqs_some_coding = [] @@seqs_unknown = [] #Transdecoder module @@complete_sure = [] @@seqs_to_analyze = [] end ############################################################################################# # MANAGER TERMINATION ############################################################################################# # close files def self.end_work_manager orf_prediction_with_transdecoder if @@options[:acess_db].include?('p') && !@@complete_sure.empty? && !@@seqs_to_analyze.empty? write_summary_stats(@@stats_hash, @@stats_taxonomy, @@stats_different_prot_id, @@stats_different_prot_id_complete_seqs, @@output_files['stats_txt'], @@output_files['stats_html']) @@output_files.each do |key, handler| handler.close end end ############################################################################################# # MANAGER NATIVE FUNCTIONS ############################################################################################# # this method is called every time a worker needs new data to work. This method is executed many times like the chunk size says. # Return the work data or nil if no more data is available def next_work #Manage INput's worker n,f,q = @@fasta_file.next_seq if !n.nil? @@stats_hash['input_seqs'] += 1 return Sequence.new(n,f,q) else return nil end end # this method is ejecuted each time an obj is finished def work_received(objs) #Manage OUTput's worker objs.each do |seq| transdecoder_keep_seq(seq) repTrans_keep_seq(seq) if seq.type > UNKNOWN && seq.type < NCRNA get_taxonomy(seq.hit.definition, @@stats_taxonomy) get_functional_annotations(seq) end write_seq(seq) if @@options[:acess_db].include?('c') || !@@options[:acess_db].include?('p') || ( seq.type != UNKNOWN && seq.type != CODING ) #Don't write Unknown or coding sequences when use transdecoder end @@stats_hash, @@stats_different_prot_id, @@stats_different_prot_id_complete_seqs = summary_stats(objs, @@stats_hash, @@stats_different_prot_id, @@stats_different_prot_id_complete_seqs) end def error_received(worker_error, obj) puts "WARNING!!!!!. CHUNK FAILED:Error while processing object #{obj.first.seq_name}\n" + worker_error.original_exception.message + ":\n" +worker_error.original_exception.backtrace.join("\n") end def too_many_errors_received $LOG.error "Too many errors: #{@@error_count} errors on #{@@count} executed sequences, exiting before finishing" end # send initial config def worker_initial_config return @@options end ############################################################################################# # CUSTOM FUNCTIONS ############################################################################################# def self.load_functional_annotations(annotation_file) functional_annotations = {} File.open(annotation_file).each do |line| line.chomp! fields = line.split("\t") acc = fields.shift functional_annotations[acc] = fields end return functional_annotations end def get_functional_annotations(seq) all_info = @@functional_annotations[seq.hit.acc.gsub(/-\d+/,'')] #gsub removes splicing code of uniprot accesion if !all_info.nil? annotations = {} @@func_annot_type.each do |type, position| annotations[type] = all_info[position] end seq.functional_annotations = annotations end end # write results to files def write_seq(seq) begin seq.write_info(@@output_files) rescue Exception => e puts "Error printing #{seq.seq_name}" puts e.message, e.backtrace.join("\n") end end def repTrans_keep_seq(seq) if !@@options[:reptrans].nil? case seq.type when COMPLETE .. INTERNAL @@seqs_annotation_prot << seq when CODING @@seqs_some_coding << seq when UNKNOWN @@seqs_unknown << seq end end end def self.repTrans_keep_seq(seq) if !@@options[:reptrans].nil? case seq.type when COMPLETE .. INTERNAL @@seqs_annotation_prot << seq when CODING @@seqs_some_coding << seq when UNKNOWN @@seqs_unknown << seq end end end def transdecoder_keep_seq(seq) if @@options[:acess_db].include?('p') case seq.type when COMPLETE @@complete_sure << seq if seq.status && seq.hit.ident >= @@options[:training_ident] when CODING @@seqs_to_analyze << seq when UNKNOWN @@seqs_to_analyze << seq end end end def self.orf_prediction_with_transdecoder clusters_seqs_annot_prot = clustering_by_id(@@complete_sure) final_seqs = select_representative(clusters_seqs_annot_prot) coding_info = nil Dir.chdir('temp') do orfs = get_seqs(final_seqs) File.open('training_set.fasta', 'w') {|f| f.write(orfs)} orfs = get_seqs(@@seqs_to_analyze) File.open('analyse_set.fasta', 'w') {|f| f.write(orfs)} cmd = "TransDecoder -t analyse_set.fasta --workdir transdecoder --train training_set.fasta" cmd << ' --reuse' if Dir.exists?('transdecoder') system(cmd) coding_info = get_coding_info('transdecoder/longest_orfs.pep') coding_info = get_scores('transdecoder/longest_orfs.cds.scores', coding_info) coding_info = correct_by_selected('transdecoder/longest_orfs.cds.scores.selected', coding_info) end @@seqs_to_analyze.each do |seq| coding = coding_info[seq.seq_name] asign_coding_attributes(seq, coding) if !coding.nil? repTrans_keep_seq(seq) seq.write_info(@@output_files) end end def self.get_seqs(seqs) all_seqs = '' seqs.each do |seq| all_seqs << ">#{seq.seq_name}\n#{seq.seq_fasta}\n" end return all_seqs end def self.correct_by_selected(selected, coding_info) seqs_selected = [] File.open(selected).each do |line| line.chomp! seq_name, orf_id = line.split('|', 2) seqs_selected << orf_id end coding_info.each do |seq_name, orfs| orfs.each do |orf, info| info[1] = '-' if !seqs_selected.include?(orf) end end return coding_info end def self.asign_coding_attributes(seq, coding) seq.type = CODING @@stats_hash['unknown'] -= 1 @@stats_hash['unknown_>200'] -= 1 if seq.seq_fasta.length > 200 @@stats_hash['unknown_>500'] -= 1 if seq.seq_fasta.length > 500 @@stats_hash['coding_>200'] += 1 if seq.seq_fasta.length > 200 @@stats_hash['coding_>500'] += 1 if seq.seq_fasta.length > 500 @@stats_hash['coding'] += 1 coding = select_orf(coding) if coding[1] == 'complete' seq.status = TRUE @@stats_hash['coding_sure'] += 1 else @@stats_hash['coding_putative'] += 1 end seq.t_code = coding.last ind = 2 ind = 3 if coding[4] == '-' frame = (coding[ind]%3)+1 frame = frame * -1 if coding[4] == '-' seq.hit = [coding[2], coding[3], frame] end def self.select_orf(orfs_hash) orf = nil ratioX = get_min_Xratio(orfs_hash) orfs_hash.select!{|id, info| info.first == ratioX} orfs = orfs_hash.select{|id, info| info[1] == 'complete'} orfs = orfs_hash if orfs.empty? max_score = get_max_score(orfs) orfs.select!{|id, info| info.last == max_score} orf = orfs.values.first return orf end def self.get_max_score(orfs_hash) score = nil orfs_hash.each do |id, info| local = info.last if score.nil? score = local else score = local if local > score end end return score end def self.get_min_Xratio(orfs_hash) ratio = nil orfs_hash.each do |id, info| local = info.first if ratio.nil? ratio = local else ratio = local if local < ratio end end return ratio end def self.get_coding_info(file_name) coding_info = {} begin FastaQualFile.new(file_name, '').each do |name, seq, comments, qual| seq_length = seq.length f_len = seq.length.to_f x_len = seq.count('X') seq_name, orf_id = name.split('|') comments =~ /type:(\S+)/ type = $1 comments =~ /:(\d+)-(\d+)\(([+-])\)/ start = $1.to_i stop = $2.to_i strand = $3 record = coding_info[seq_name] info = [x_len / f_len, type, start, stop, strand] if record.nil? coding_info[seq_name] = {orf_id => info} else record[orf_id] = info end end rescue puts "Warning!!!!!!!!!!: Transdecoder file is missing. Check if Transdecoder is installed" end return coding_info end def self.get_scores(file_name, coding_info) File.open(file_name).each do |line| line.chomp! fields = line.split("\t") name = fields.shift seq, orf_id = name.split('|') coding = coding_info[seq] if !coding.nil? orf = coding[orf_id] if !orf.nil? score = fields.first.to_f if score > 0 orf << fields.first.to_f if !orf.nil? else coding.delete(orf_id) coding_info.delete(seq) if coding.empty? end end end end return coding_info end def self.get_annotations return @@seqs_annotation_prot, @@seqs_some_coding, @@seqs_unknown end end