require 'scbi_mapreduce' require 'my_worker_manager_EST' #Second server require 'fln_stats' require 'types' ######################################################################## # MAIN FUNCTION ######################################################################## def reptrans(seqs_annotation_prot, seqs_some_coding ,seqs_unknown, options) cpus = count_cpu(options) stats_hash = initialize_stats_hash_reptrans # Paths #--------------------------------------------- main_path = File.join(Dir.pwd, 'fln_results') reptrans_fasta = File.join(main_path, 'Representative_transcriptome.fasta') blast_path = File.join(main_path, 'ESTdb') cluster_prot_annotated_path =File.join(main_path, 'Prot_clusters') cluster_EST_annotated_path =File.join(main_path, 'EST_clusters') html_file = File.join(main_path, 'Representative_transcriptome_stats.html') txt_file = File.join(main_path, 'Representative_transcriptome_stats.txt') # Prot annotations sequence analysis #--------------------------------------------- analysis_over_DB_annotated_seqs(seqs_annotation_prot, reptrans_fasta, cluster_prot_annotated_path, stats_hash, 'prot_annotated', options[:high_clustering]) seqs_annotation_prot = nil # NOT Prot annotations sequence analysis #--------------------------------------------- putative_seqs = seqs_some_coding if !options[:est_db].nil? # WITH EST DATABASE putative_seqs += seqs_unknown # Coding & unknown putative_seqs = reduce_pool_sequences(putative_seqs, main_path, cpus) if !File.exists?(blast_path +'.nsq') $LOG.info "Start makeblastdb over EST DB" system("makeblastdb -in #{options[:est_db]} -out #{blast_path} -dbtype nucl -parse_seqids > #{File.join(main_path, 'log_makeblast_db')}") $LOG.info "Ended makeblastdb over EST DB" end putative_seqs = do_blast_with_EST(putative_seqs, options, reptrans_fasta, blast_path, cluster_EST_annotated_path, stats_hash) end # Coding sequence analysis #--------------------------------------------- if !putative_seqs.nil? && !putative_seqs.empty? putative_seqs = select_seqs_more_500pb(putative_seqs) putative_seqs = reduce_pool_sequences(putative_seqs, main_path, cpus) if options[:est_db].nil? # NOT EST database putative_seqs.sort!{|s1, s2| #Order by testcode (first) and sequence length (last) if s2.t_code == s1.t_code s2.fasta_length <=> s1.fasta_length else s2.t_code <=> s1.t_code end } count = 0 putative_seqs.each do |coding_seq| coding_stats_reptrans(coding_seq, stats_hash) count +=1 end write_fasta(putative_seqs, reptrans_fasta, 'a') end write_reptrans_stats(stats_hash, html_file, txt_file) end ######################################################################## # END MAIN FUNCTION ######################################################################## def analysis_over_DB_annotated_seqs(seqs_annotation_DB, reptrans_fasta, cluster_file_path, stats_hash, key_stats, pfam_clustering) clusters_seqs_annot_DB = clustering_by_id(seqs_annotation_DB) representative_seqs_annot_DB = select_representative(clusters_seqs_annot_DB) if pfam_clustering clusters_seqs_annot_DB = clustering_by_annot(representative_seqs_annot_DB, :pfam_id) # pfam id, fix get the annotation guide on my_worker_manager_fln (@@func_annot_type) to this scope representative_seqs_annot_DB = select_representative(clusters_seqs_annot_DB) # merge clusters by id and by pfam end stats_hash[key_stats] += representative_seqs_annot_DB.length report_clustering(cluster_file_path, clusters_seqs_annot_DB, representative_seqs_annot_DB) write_fasta(representative_seqs_annot_DB, reptrans_fasta, 'w') end def report_clustering(cluster_file_path, clusters_seqs_annot_DB, representative_seqs_annot_DB) cluster_file = File.open(cluster_file_path, 'w') representative_seqs_annot_DB.each_with_index do |rep_seq, i| cluster_seqs = clusters_seqs_annot_DB[i].map{|seq| seq.seq_name}.join(';') cluster_file.puts "#{rep_seq.seq_name}\t#{cluster_seqs}" end cluster_file.close end def reduce_pool_sequences(putative_seqs, main_path, cpu) temp_fasta = File.join(main_path, 'temp.fasta') temp_fasta_clean = File.join(main_path, 'temp_cln.fasta') log_file = File.join(main_path, 'log_cd_hit_Cod_Unk') write_fasta(putative_seqs, temp_fasta, 'w') $LOG.info "Start cd-hit with coding and unknow sequences" system("cd-hit -i #{temp_fasta} -o #{temp_fasta_clean} -c 0.95 -M 0 -T #{cpu} > #{log_file}") if !File.exists?(temp_fasta_clean) $LOG.info "Ended cd-hit with coding and unknow sequences" cd_hit_names_putative_seqs = load_cd_hit_sequences_names(temp_fasta_clean) putative_seqs = select_seqs_with_name(putative_seqs, cd_hit_names_putative_seqs) return putative_seqs end def do_blast_with_EST(putative_seqs, options, reptrans_fasta, blast_path, cluster_EST_annotated_path, stats_hash) # Second server to representative transcriptome $LOG.info 'Starting server for EST analysis' custom_worker_file = File.join(File.dirname(ROOT_PATH),'lib','full_lengther_next','classes','my_worker_EST.rb') options[:chimera] = nil #Inactive chimeras system on RepTrans, this resume the BLAST's output MyWorkerManagerEst.init_work_manager(putative_seqs, options, blast_path) server_EST = ScbiMapreduce::Manager.new(options[:server_ip], options[:port], options[:workers], MyWorkerManagerEst, custom_worker_file, STDOUT, FULL_LENGTHER_NEXT_INIT) server_EST.chunk_size = options[:chunk_size] server_EST.start_server $LOG.info 'Closing server for EST analysis' seqs_with_EST, putative_seqs = MyWorkerManagerEst.get_array_seqs if !seqs_with_EST.empty? analysis_over_DB_annotated_seqs(seqs_with_EST, reptrans_fasta, cluster_EST_annotated_path, stats_hash, 'est_annotated') end return putative_seqs end def load_cd_hit_sequences_names(file) names=[] File.open(file).readlines.each do |line| if line =~ /^>/ line.chomp! line.gsub!('>','') names << line end end return names end def select_seqs_more_500pb(seqs_array) seqs = seqs_array.select{|seq| seq.fasta_length > 500 } return seqs end def select_seqs_with_name(array_seqs, array_names) seqs = array_seqs.select{|seq| array_names.include?(seq.seq_name)} return seqs end def write_fasta(seqs_array, file_name, mode) file=File.open(file_name, mode) seqs_array.each do |seq| file.puts ">#{seq.seq_name}\n#{seq.seq_fasta}" end file.close end def clustering_by_id(seqs_with_hit) clusters=[] hit_id=[] seqs_with_hit.each do |seq| position=hit_id.index(seq.get_acc) if position.nil? hit_id << seq.get_acc clusters << [seq] else clusters[position] << seq end end return clusters end def clustering_by_annot(seqs_with_hit, annotation_type) clusters = [] annot_id = [] no_annotation_clusters = [] seqs_with_hit.each do |seq| annot = seq.functional_annotations[annotation_type] annot = annot.split(';').sort.join(';') if !annot.nil? if annot == '-' || annot.nil? no_annotation_clusters << [seq] else position = annot_id.index(annot) if position.nil? annot_id << annot clusters << [seq] else clusters[position] << seq end end end clusters.concat(no_annotation_clusters) return clusters end def select_representative(clusters_seqs_annot_prot) seqs = [] clusters_seqs_annot_prot.each do |cluster| if !cluster.first.coverage_analysis.empty? # filtering by mapping coverage max_transcript_mean_coverage = cluster.map{|seq| seq.coverage_analysis[3] }.max - 0.05 # Relaxed limit of 5% cluster.select!{|seq| seq.coverage_analysis[3] >= max_transcript_mean_coverage} end seq = cluster.select{|s| s.type == COMPLETE}.sort{|fl1, fl2| fl2.seq_fasta <=> fl1.seq_fasta}.first # Take longest full-length, s -> sequence, fl -> full-lentgh if seq.nil? cluster.sort!{|cl1, cl2| cl2.get_pident <=> cl1.get_pident} best_pident = cluster.first.get_pident seq = cluster.select{|s| s.get_pident == best_pident}.sort{|s1, s2| s2.seq_fasta <=> s1.seq_fasta}.first end seqs << seq end return seqs end def count_cpu(options) cpu = 0 if options[:workers].class.to_s == 'Array' cpu = options[:workers].length + 1 else cpu = options[:workers] end return cpu end