$: << File.expand_path(File.join(File.dirname(__FILE__))) require 'scbi_mapreduce' require 'scbi_blast' require 'sequence' require 'fl_string_utils' require 'test_code' require 'types' require 'artifacts' require 'blast_functions' require 'exonerate_result' require 'scbi_fasta' require 'mapping' require 'fl_analysis' include FlAnalysis require 'nc_rna' include NcRna class MyWorker < ScbiMapreduce::Worker ##################################################################################### # WORKER FUNCTIONS ##################################################################################### def receive_initial_config(manager_options) # Reads the parameters # $WORKER_LOG.info "Params received: #{obj.to_json}" @options = manager_options $verbose = manager_options[:verbose] end def process_object(initial_obj) task = initial_obj.first.keys.first obj = nil if task == :fln obj = initial_obj.map{|hash| hash[:fln]} # Punto de arranque de FLN $WORKER_LOG.info "Task: #{task}. Processing chunk: #{obj.first.seq_name}" full_lenghter2(obj) elsif task == :mapping obj = initial_obj.first.values.first $WORKER_LOG.info "Task: #{task}. Processing chunk: #{obj}" obj = map_transcriptome(obj) end return {task => obj} end def closing_worker end ##################################################################################### # MAPPING METHODS ##################################################################################### def map_transcriptome(initial_obj) ref_file = initial_obj prefix = File.basename(ref_file, '.fasta') mapping2 = Mapping.new( ref_fasta_path: File.join(@options[:temp_map_folder], ref_file), threads: 1, temp_folder: @options[:temp_map_folder] ) $WORKER_LOG.info "Do bowtie ref" mapping2.do_ref( name: "#{prefix}_ref", log: "#{prefix}_reference_log" ) $WORKER_LOG.info "Do bowtie mapping" mapping2.do_map( files: @options[:files2map], command: 'bowtie2 -p /THREADS/ -x /REFERENCE/ -a', paired_pipe: '| samtools view -bS -f 2 | samtools sort -o /OUTPUT/', single_pipe: '| samtools view -bS -F 4 | samtools sort -o /OUTPUT/', additional_paired_flags: '--no-mixed', output: File.join(@options[:temp_map_folder], "#{prefix}_map_data"), log: File.join(@options[:temp_map_folder], "#{prefix}_mapping_log"), ) $WORKER_LOG.info "Do samtools ref" mapping2.index $WORKER_LOG.info "Do idxstats" mapping2.idxstats $WORKER_LOG.info "Do mpileup and coverage analysis" map_object2 = mapping2.mpileup('--ff' => 'UNMAP,QCFAIL') $WORKER_LOG.info "Finished coverage analysis" return map_object2 end ##################################################################################### # FLN FUNCTIONS ##################################################################################### #---------------------------------------------------------------------------------- # MAIN FUNCTION #---------------------------------------------------------------------------------- def full_lenghter2(seqs) #seqs.map{|seq| seq.change_degenerated_nt!} # Clean degenerated nt check_seqs = seqs if !@options[:files2map].empty? && @options[:remove_unmapped] check_seqs = check_mapping(seqs) end # User database #-------------------------------------------- # if the user has included his own database in the parameters entry, # the location of the database is tested, and blast and the results analysis is done if @options[:user_db] user_db = File.basename(@options[:user_db]) check_seqs = check_prot_db(check_seqs, @options[:user_db], 'blastx', 1, user_db, @options[:blast]) end # UniProt (sp) #-------------------------------------------- if @options[:acess_db].include?('s') sp_db = 'sp_'+@options[:tax_group] sp_path = File.join(sp_db, 'sp_'+@options[:tax_group]) check_seqs = check_prot_db(check_seqs, sp_path, 'blastx', 1, sp_db, @options[:blast]) end # UniProt (tr) #-------------------------------------------- if @options[:acess_db].include?('t') tr_db = 'tr_'+@options[:tax_group] tr_path = File.join(tr_db,'tr_'+@options[:tax_group]) check_seqs = check_prot_db(check_seqs, tr_path, 'blastx', 1, tr_db, @options[:blast]) end # nc RNA #-------------------------------------------- if @options[:acess_db].include?('n') check_seqs = seqs.select{|s| s.type == UNKNOWN} ncrna_path = File.join('nc_rna_db','ncrna') check_ncRNA(check_seqs, ncrna_path, 'blastn', 1e-3) end # Test Code #-------------------------------------------- # the sequences without a reliable similarity with an orthologue are processed with Test Code if @options[:acess_db].include?('c') check_seqs = seqs.select{|s| s.type == UNKNOWN } check_testcode(check_seqs) end end #---------------------------------------------------------------------------------- # END MAIN #---------------------------------------------------------------------------------- def check_mapping(seqs) new_seqs = [] seqs.each do |s| artifact?(s, nil, 'mapping', '', @options, new_seqs) end seqs.concat(new_seqs) return seqs.select{|s| !s.ignore } end def check_prot_db(seqs, db_path, blast_type, evalue, db_name, additional_blast_options) if $verbose > 0 puts "\e[33m=========================================\e[0m", "\e[33m#{db_name}\t#{seqs.length}\e[0m", "\e[33m=========================================\e[0m" end my_blast = run_blast(seqs, db_path, blast_type, evalue, additional_blast_options, @options[:exonerate]) # do blast new_seqs = [] seqs.each_with_index do |seq, i| # parse blast puts "\e[31m#{seq.seq_name}\e[0m" if $verbose > 0 ## VERBOSE if !my_blast.querys[i].hits.first.nil? status='Artifact analysis' begin check_blast(seq, my_blast.querys[i]) # Check if seq and query are the same if !artifact?(seq, my_blast.querys[i], db_name, db_path, @options, new_seqs) status = 'Full length analysis' best_hits = filter_hits(my_blast.querys[i], 100) record_position = seqs.index(seq) seq = search_best_orf_y_fl(seq, best_hits, @options, db_name)# FULL LENGTH ANALYSIS seqs[record_position]= seq #Replace the old seq by the new seq seq.area_without_annotation? if @options[:chimera] != 'd' && !seq.hit.nil? end rescue Exception => e rescue_sequence(e, seq, status) end end end seqs.concat(new_seqs) check_seqs = seqs.select{|s| !s.ignore || (s.type == COMPLETE && s.area_without_annotation)} return check_seqs end # ejecuta blast utilizando los parametros fichero de entrada, base de datos, tipo de blast y evalue def run_blast(input, database, blast_type, evalue, additional_blast_options, do_exonerate, filter = TRUE) if !input.empty? && !input.nil? $WORKER_LOG.info "DB: #{File.basename(database)} #{input.length}" blast = BatchBlast.new("-db #{database}", blast_type, "-evalue #{evalue} #{additional_blast_options}") chunk_name = input.first.seq_name.gsub(/\W+/,'_') file_path = File.join(@options[:temp], File.basename(database)+'_'+chunk_name) if @options[:hdd] #Write/parse blast on Disk file_name = file_path+'.blast' #Each blast is identified with database_name and first sequence's name on chunk if !File.exists?(file_name) blast_result = blast.do_blast_seqs(input, :table, TRUE, file_name) else blast = nil blast_result=BlastTableResult.new(file_name) end else blast_result = blast.do_blast_seqs(input, :table) end refine_analysis_with_exonerate(blast_result, input, file_path, database, @options[:ident]) if do_exonerate if filter #Delete hits with low identity, this enables ident filter on normal FLN execution and disables it when RepTrans with my_workerEST clean_by_identity(blast_result, @options[:ident]) #clean_by_query_length_match(blast_result, 1000)#60 is min length of the match in nt end $WORKER_LOG.info "#BLAST ENDED" return blast_result else return nil end end def rescue_sequence(e, seq, status) seq.save_fasta = FALSE seq.ignore = TRUE seq.type = FAILED puts '-- '+seq.seq_name+' FAILED ANALYSIS -- '+status, e.message, e.backtrace.join("\n") end def check_ncRNA(check_seqs, ncrna_path, blast_type, evalue) my_blast = run_blast(check_seqs, ncrna_path, blast_type, evalue, '', FALSE, nil) if !my_blast.nil? check_seqs.each_with_index do |seq,i| find_nc_rna(seq, my_blast.querys[i]) end end end def check_testcode(check_seqs) check_seqs.map{|seq| TestCode.new(seq)} end def check_blast(seq, blast_query) if seq.seq_name != blast_query.query_def # used to detect if the sequence and the blast are from different query raise "BLAST query name and sequence are different" end end def search_best_orf_y_fl(seq, best_hits, options, db_name) warning = nil if best_hits.length > 1 all_options = [] best_hits.map{|hit| new_seq = seq.clone puts "\n\t\e[35mCheck protein #{hit.first.acc}\e[0m" if $verbose > 1 ## VERBOSE analiza_orf_y_fl(new_seq, hit, options, db_name) all_options << new_seq } all_options.select!{|option| option.type > UNKNOWN} best_type = all_options.map{|option| option.type}.min best_options = all_options.select{|option| option.type == best_type} filtered_options = best_options.select{|option| option.status} # Select sure options filtered_options = best_options if filtered_options.empty? # All options are putative #best_option = filtered_options.first # select hit with big perc ident query best_option = filtered_options.sort{|seq1, seq2| seq2.hit.ident <=> seq1.hit.ident}.first # select hit with big perc ident query if !all_options.empty? # There is one sequence unless warning = [['PositionResult', all_options.index(best_option)+1]] else best_option = seq end else analiza_orf_y_fl(seq, best_hits.first, options, db_name) best_option = seq warning = 'SingleResult' end if seq.type == FAILED seq.type = UNKNOWN seq.ignore = FALSE else best_option.warnings(warning) if !warning.nil? end return best_option end def refine_analysis_with_exonerate(blast_result, input, file_path, database, ident) querys_stats = hits_statistics(blast_result) if !querys_stats.empty? querys, targets = select_sequences(querys_stats) if !querys.empty? && !targets.empty? write_querys(querys, input, file_path) write_targets(targets, file_path, database) file_name = file_path + '.exonerate' system("exonerate --useaatla 0 --showalignment 0 --model protein2dna #{file_path+'.prot'} #{file_path+'.dna'} > #{file_name}") if !File.exists?(file_name) seqs = {} querys.map{|position| seqs[input[position].seq_name] = input[position].seq_fasta} exonerate_result = ExonerateResult.new(file_name, seqs, get_prot_sequences(file_path)) clean_subjec_ids_name(exonerate_result) replace_hits(blast_result, exonerate_result) end end end def replace_hits(blast_result, exonerate_result) blast_result.querys.each do |query| exonerate_query = blast_result.find_query(exonerate_result.querys, query.query_def) if !exonerate_query.nil? blast_hits = cluster_hsps(query.hits) exonerate_hits = cluster_hsps(exonerate_query.hits) blast_hits.map! {|hit| num_hsps = hit.length if num_hsps > 1 exonerate_hit = find_hit(hit.first.acc, exonerate_hits) if !exonerate_hit.nil? && exonerate_hit.length < num_hsps #We replace hits with by hits with less hsps because we supose that exonerate has merged them exonerate_hit.map{|ex_hit| ex_hit.s_len = hit.first.s_len ex_hit.q_len = hit.first.q_len ex_hit.definition = hit.first.definition } exonerate_hit else hit end else hit end } query.hits = blast_hits.flatten end end end def clean_subjec_ids_name(exonerate_result) exonerate_result.querys.each do |query| query.hits.map{|hit| hit.subject_id.sub!('lcl|','') hit.acc.sub!('lcl|','') } end end def hits_statistics(blast_result) querys_stats = [] blast_result.querys.each_with_index do |query, ind| if !query.hits.empty? query.hits.each do |hit| if querys_stats[ind].nil? querys_stats[ind] = {hit.acc => 1} else if querys_stats[ind][hit.acc].nil? querys_stats[ind][hit.acc] = 1 else querys_stats[ind][hit.acc] += 1 end end end end end return querys_stats end def select_sequences(querys_stats) querys = [] targets = [] querys_stats.each_with_index do |hits, query_position| if !hits.nil? hits.each do |hit_id, n_hsps| if n_hsps > 1 querys << query_position if !querys.include?(query_position) targets << hit_id if !targets.include?(hit_id) end end end end return querys, targets end def write_querys(querys, input, file_path) file_name = file_path+'.dna' if !File.exists?(file_name) fasta = File.open(file_name, 'w') querys.each do |query_position| seq = input[query_position] fasta.puts ">#{seq.seq_name}\n#{seq.seq_fasta}" end fasta.close end end def write_targets(targets, file_path, database) puts "-- This batch has not unigenes for exonerate: #{file_path}" if targets.empty? file_name = file_path+'.prot' if !File.exists?(file_name) targets.each_slice(400) do |slice| #This loop avoids shell buffered out when the list of entries is huge entries = slice.join(',') system("blastdbcmd -db #{database} -entry #{entries} >> #{file_name}") end end end def get_prot_sequences(file_path) sequences = {} file_name = file_path+'.prot' fqr = FastaQualFile.new(file_name) fqr.each do |name,seq_fasta| sequences[name] = seq_fasta end fqr.close return sequences end end