lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb in seqtrimnext-2.0.51 vs lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb in seqtrimnext-2.0.52

- old
+ new

@@ -1,28 +1,28 @@ #finds the classes that were in the folder 'classes' # ROOT_PATH=File.dirname(File.dirname(File.dirname(__FILE__))) -# +# # $: << File.expand_path(File.join(ROOT_PATH, 'classes')) # $: << File.expand_path(File.join(ROOT_PATH, 'classes','blast')) -# +# # #finds the classes that were in the folder 'plugins' # $: << File.expand_path(File.join(ROOT_PATH, 'plugins')) -# +# # #finds the classes that were in the folder 'plugins' # $: << File.expand_path(File.join(ROOT_PATH, 'actions')) -# +# # #finds the classes that were in the folder 'utils' # $: << File.expand_path(File.join(ROOT_PATH, 'utils')) -# +# # $: << File.expand_path(File.join(ROOT_PATH, 'classes','em_classes')) -# +# # $: << File.expand_path(ROOT_PATH) $: << File.expand_path('~/progs/ruby/gems/seqtrimnext/lib/') -$: << File.expand_path('~/progs/ruby/gems/scbi_mapreduce/lib') +# $: << File.expand_path('~/progs/ruby/gems/scbi_mapreduce/lib') require 'seqtrimnext' $SEQTRIM_PATH = ROOT_PATH @@ -30,11 +30,11 @@ if ENV['BLASTDB']# && Dir.exists?(ENV['BLASTDB']) $FORMATTED_DB_PATH = ENV['BLASTDB'] $DB_PATH = File.dirname($FORMATTED_DB_PATH) else $FORMATTED_DB_PATH = File.expand_path(File.join(ROOT_PATH, "DB",'formatted')) - $DB_PATH = File.expand_path(File.join(ROOT_PATH, "DB")) + $DB_PATH = File.expand_path(File.join(ROOT_PATH, "DB")) end ENV['BLASTDB']=$FORMATTED_DB_PATH OUTPUT_PATH='output_files' @@ -45,256 +45,391 @@ require 'scbi_mapreduce' require 'params' require 'action_manager' require 'plugin_manager' # require 'sequence_with_action' -# +# require 'scbi_fastq' require 'sequence_group' class SeqtrimWorker < ScbiMapreduce::Worker - def process_object(obj) - running_seqs=SequenceGroup.new(obj) - - # execute plugins - @plugin_manager.execute_plugins(running_seqs) - - # add output data - add_output_data(running_seqs) - - return running_seqs - end - - def receive_initial_config(obj) + def process_object(obj) - # Reads the parameters - $WORKER_LOG.info "Params received" -# @params = Params.new(params_path) - @params = obj - - @use_qual=@params.get_param('use_qual') - @use_json=@params.get_param('use_json') - end + running_seqs=SequenceGroup.new(obj.flatten) - def starting_worker + # execute plugins + @plugin_manager.execute_plugins(running_seqs) - # $WORKER_LOG.level = Logger::ERROR - $WORKER_LOG.level = Logger::WARN - $WORKER_LOG.info "Loading actions" + # add output data + add_output_data(running_seqs) - @action_manager = ActionManager.new + return running_seqs + end - $WORKER_LOG.info "Loading plugins" - @plugin_list = @params.get_param('plugin_list') # puts in plugin_list the plugins's array - $WORKER_LOG.info "PLUGIN LIST:" + @plugin_list - - @plugin_manager = PluginManager.new(@plugin_list,@params) # creates an instance from PluginManager. This must storage the plugins and load it - - rescue Exception => e - puts (e.message+ e.backtrace.join("\n")) - - end - - - def closing_worker - - end - - - def add_output_data(obj) - obj.output_text=[] - - obj.each do |seq| - obj.output_text << seq.to_text - write_seq_to_files(obj.output_files,seq, obj.stats) - end - - # @remove seqs since they are not needed anymore to write output files - obj.remove_all_seqs - end - - def add_stat(stats,key,subkey,value,count=1) - - stats[key]={} if !stats[key] - stats[key][subkey]={} if !stats[key][subkey] - stats[key][subkey][value]=0 if !stats[key][subkey][value] - - stats[key][subkey][value]+=count - end + def receive_initial_config(obj) - def write_seq_to_files(files,seq, stats) - # puts stats.to_json - - dir_name,file_name=seq.get_file_tag_path - # puts File.join(dir_name,'sequences_'+file_name) - - # get current inserts - inserts = seq.get_inserts - - # qualities are optional - if @use_qual - qual_inserts = seq.get_qual_inserts - end - - # save json if necessary - if @use_json - json_file(files)<< seq.to_json - end - - # find mids - mid = seq.get_actions(ActionMid).first - - - if (seq.seq_rejected) # sequence rejected - - #save to rejected sequences - message = seq.seq_rejected_by_message - rejected_output_file(files)<<('>'+seq.seq_name+ ' ' + message) - - add_stat(stats,'sequences','rejected',seq.seq_rejected_by_message) - add_stat(stats,'sequences','count','rejected') - + # Reads the parameters + $WORKER_LOG.info "Params received" + # @params = Params.new(params_path) + @params = obj + @tuple_size=@params.get_param('tuple_size') - elsif (inserts.empty?) #sequence with no inserts - message = 'No valid inserts found' - rejected_output_file(files)<<('>'+seq.seq_name+ ' ' + message) - - add_stat(stats,'sequences','rejected',message) - add_stat(stats,'sequences','count','rejected') - - elsif (inserts.count == 2) # sequence with two inserts = PAIRED SEQUENCES - add_stat(stats,'sequences','count','output_seqs_paired') + @use_qual=@params.get_param('use_qual') + @use_json=@params.get_param('use_json') + end - # TODO - Add this stats to full stats - # @@full_stats.add_stats({'sequences' => {'paired' => {'count' => 1}}}) - - if (mid.nil? || (mid.message=='no_MID') ) # without mid - mid_id = 'no_MID' - mid_message = ' No MID found' - else - mid_id = mid.tag_id - mid_message='' - if mid_id != mid_message - mid_message = ' '+mid.message - end - end + def starting_worker - # fasta_file = get_paired_file(mid_id) + # $WORKER_LOG.level = Logger::ERROR + $WORKER_LOG.level = Logger::WARN + $WORKER_LOG.info "Loading actions" - n="#{seq.seq_name}_left" - c="template=#{seq.seq_name} dir=R library=#{mid_id}" - f=inserts[0].reverse.tr('actgACTG','tgacTGAC') - q=[] - if @use_qual - q=qual_inserts[0].reverse - end + @action_manager = ActionManager.new - paired_file(files,dir_name,file_name)<<FastqFile.to_fastq(n,f,q,c) - - - n="#{seq.seq_name}_right" - c="template=#{seq.seq_name} dir=F library=#{mid_id}" - f=inserts[1] - q=[] - if @use_qual - q=qual_inserts[1] - end - - paired_file(files,dir_name,file_name)<<FastqFile.to_fastq(n,f,q,c) - - - elsif (inserts.count == 1) # sequence with one insert + $WORKER_LOG.info "Loading plugins" + @plugin_list = @params.get_param('plugin_list') # puts in plugin_list the plugins's array + $WORKER_LOG.info "PLUGIN LIST:" + @plugin_list - if (mid.nil? || (mid.message=='no_MID') ) # without mid - mid_id = 'no_MID' - mid_message = ' No MID found' - else - mid_id = mid.tag_id - mid_message='' - if mid_id != mid_message - mid_message = ' '+mid.message - end - end + @plugin_manager = PluginManager.new(@plugin_list,@params) # creates an instance from PluginManager. This must storage the plugins and load it - # save fasta and qual in no MID file - has_low_complexity = seq.get_actions(ActionLowComplexity) - - if has_low_complexity.empty? - add_stat(stats,'sequences','count','output_seqs') - - # fasta_file = get_sequence_file(mid_id) - # sff_file=get_sffinfo_file(mid_id) - fasta_file=sequence_file(files,dir_name,file_name) - sff_file=sffinfo_file(files,dir_name,file_name) - else - add_stat(stats,'sequences','count','output_seqs_low_complexity') - - # fasta_file = get_low_complexity_file(mid_id) - # sff_file=get_low_sffinfo_file(mid_id) - fasta_file=low_complexity_file(files,dir_name,file_name) - sff_file=low_sffinfo_file(files,dir_name,file_name) + rescue Exception => e + puts (e.message+ e.backtrace.join("\n")) + + end + + + def closing_worker + + end + + + def add_output_data(obj) + obj.output_text=[] + + if @tuple_size>1 + obj.each_slice(@tuple_size) do |seqs| + + write_seq_to_files_tuple(obj.output_files,seqs, obj.stats) + + seqs.each do |seq| + obj.output_text << seq.to_text end - - q=[] - if @use_qual - q=qual_inserts[0] - end - - n=seq.seq_name - c=mid_message - f=inserts[0] - - fasta_file << FastqFile.to_fastq(n,f,q,c) - - inserts_pos = seq.get_actions(ActionInsert) - - sff_file<< "#{n} #{inserts_pos[0].start_pos+1} #{inserts_pos[0].end_pos+1}" - end - - end - - - # ACCESS TO FILES - def json_file(files) - return get_file(files,File.join(OUTPUT_PATH,'results.json')) - end - - def rejected_output_file(files) - return get_file(files,File.join(OUTPUT_PATH,'rejected.txt')) - end - - - def sequence_file(files, dir_name, file_name) - return get_file(files,File.join(OUTPUT_PATH,dir_name,'sequences_'+file_name+'.fastq')) - end - - def paired_file(files, dir_name, file_name) - return get_file(files,File.join(OUTPUT_PATH,dir_name,'paired_'+file_name+'.fastq')) + else + obj.each do |seq| + write_seq_to_files_normal(obj.output_files,seq, obj.stats) + obj.output_text << seq.to_text + end end - def low_complexity_file(files, dir_name, file_name) - return get_file(files,File.join(OUTPUT_PATH,dir_name,'low_complexity_'+file_name+'.fastq')) + # @remove seqs since they are not needed anymore to write output files + obj.remove_all_seqs + end + + def add_stat(stats,key,subkey,value,count=1) + + stats[key]={} if !stats[key] + stats[key][subkey]={} if !stats[key][subkey] + stats[key][subkey][value]=0 if !stats[key][subkey][value] + + stats[key][subkey][value]+=count + end + + def write_seq_to_files_tuple(files,seqs, stats) + + + seq1=seqs[0] + seq2=seqs[1] + + dir_name,file_name,priority=seq1.get_file_tag_path + dir_name2,file_name2,priority2=seq2.get_file_tag_path + + # both paired sequences must go in same file, there are priorities + if (dir_name!=dir_name2) || (file_name!=file_name2) + if priority2>priority + dir_name=dir_name2 + file_name=file_name2 + end end - - def sffinfo_file(files, dir_name, file_name) - return get_file(files,File.join(OUTPUT_PATH,dir_name,'sff_info_'+file_name+'.txt')) + + # get current inserts + inserts1 = seq1.get_inserts + inserts2 = seq2.get_inserts + + # qualities are optional + if @use_qual + qual_inserts1 = seq1.get_qual_inserts + qual_inserts2 = seq2.get_qual_inserts end + + + + # save json if necessary + if @use_json + json_file(files)<< seq1.to_json + json_file(files)<< seq2.to_json + end - def low_sffinfo_file(files, dir_name, file_name) - return get_file(files,File.join(OUTPUT_PATH,dir_name,'low_complexity_sff_info_'+file_name+'.txt')) + # find mids + mid1 = seq1.get_actions(ActionMid).first + mid2 = seq2.get_actions(ActionMid).first + + + if !inserts1.empty? && !inserts2.empty? # both have inserts + # save_two_inserts(files,seq, stats,inserts,qual_inserts,mid,dir_name,file_name) + save_two_inserts_tuple(files,seq1,seq2, stats,inserts1,inserts2,qual_inserts1,qual_inserts2,mid1,dir_name,file_name) + else + save_rejected_empty_or_single(files,seq1, stats,inserts1,qual_inserts1,mid1,dir_name,file_name) + save_rejected_empty_or_single(files,seq2, stats,inserts2,qual_inserts2,mid2,dir_name,file_name) end - - def get_file(files,fn) - res=files[fn] - - if !res - files[fn]=[] - res=files[fn] + + end + + def save_two_inserts_tuple(files,seq1,seq2, stats,inserts1,inserts2,qual_inserts1,qual_inserts2,mid,dir_name,file_name) + + add_stat(stats,'sequences','count','output_seqs_paired') + add_stat(stats,'sequences','count','output_seqs_paired') + + mid_id,mid_message=get_mid_message(mid) + + # save left read + n="#{seq1.seq_name}" + c=seq1.get_comment_line # "template=#{seq1.seq_name} dir=R library=#{mid_id}" + f=inserts1[0]#.reverse.tr('actgACTG','tgacTGAC') + q=[] + if @use_qual + q=qual_inserts1[0] #.reverse + end + + paired_file_ilu1(files,dir_name,file_name)<<FastqFile.to_fastq(n,f,q,c) + + # save right read + n="#{seq2.seq_name}" + c=seq2.get_comment_line # "template=#{seq2.seq_name} dir=F library=#{mid_id}" + f=inserts2[0] + q=[] + if @use_qual + q=qual_inserts2[0] + end + + paired_file_ilu2(files,dir_name,file_name)<<FastqFile.to_fastq(n,f,q,c) + + end + + + def save_rejected_empty_or_single(files,seq, stats,inserts,qual_inserts,mid,dir_name,file_name) + if (seq.seq_rejected) # save to rejected sequences + save_rejected_seq(files,seq, stats) + elsif (inserts.empty?) #sequence with no inserts + save_empty_insert(files,seq, stats) + elsif (inserts.count == 1) # sequence with one insert + save_one_insert(files,seq, stats,inserts,qual_inserts,mid,dir_name,file_name) + end + end + + + # SAVE NORMAL =============================== + def save_rejected_seq(files,seq, stats) + # message = seq.seq_rejected_by_message + message= seq.get_comment_line + rejected_output_file(files)<<('>'+seq.seq_name+ ' ' + message) + + add_stat(stats,'sequences','rejected',seq.seq_rejected_by_message) + add_stat(stats,'sequences','count','rejected') + end + + def save_empty_insert(files,seq, stats) + seq.seq_rejected=true + seq.seq_rejected_by_message='short insert' + + message = 'No valid inserts found' + + rejected_output_file(files)<<('>'+seq.seq_name+ ' ' + message) + + add_stat(stats,'sequences','rejected',message) + add_stat(stats,'sequences','count','rejected') + + end + + def get_mid_message(mid) + if (mid.nil? || (mid.message=='no_MID') ) # without mid + mid_id = 'no_MID' + mid_message = ' No MID found' + else + mid_id = mid.tag_id + mid_message='' + if mid_id != mid_message + mid_message = ' '+mid.message end + end + return mid_id,mid_message + end + + def save_two_inserts(files,seq, stats,inserts,qual_inserts,mid,dir_name,file_name) + add_stat(stats,'sequences','count','output_seqs_paired') + + mid_id,mid_message=get_mid_message(mid) + + # save left read + n="#{seq.seq_name}_left" + c="template=#{seq.seq_name} dir=R library=#{mid_id} #{seq.get_comment_line}" + f=inserts[0].reverse.tr('actgACTG','tgacTGAC') + q=[] + if @use_qual + q=qual_inserts[0].reverse + end + + paired_file(files,dir_name,file_name)<<FastqFile.to_fastq(n,f,q,c) + + # save right read + n="#{seq.seq_name}_right" + c="template=#{seq.seq_name} dir=F library=#{mid_id} #{seq.get_comment_line}" + f=inserts[1] + q=[] + if @use_qual + q=qual_inserts[1] + end + + paired_file(files,dir_name,file_name)<<FastqFile.to_fastq(n,f,q,c) + + end + + def save_one_insert(files,seq, stats,inserts,qual_inserts,mid,dir_name,file_name) + mid_id,mid_message=get_mid_message(mid) + + # save fasta and qual in no MID file + has_low_complexity = seq.get_actions(ActionLowComplexity) + + if has_low_complexity.empty? + add_stat(stats,'sequences','count','output_seqs') + + fasta_file=sequence_file(files,dir_name,file_name) + sff_file=sffinfo_file(files,dir_name,file_name) + else + add_stat(stats,'sequences','count','output_seqs_low_complexity') + + fasta_file=low_complexity_file(files,dir_name,file_name) + sff_file=low_sffinfo_file(files,dir_name,file_name) + end + + q=[] + if @use_qual + q=qual_inserts[0] + end + + n=seq.seq_name + c=mid_message + + seq_comments=seq.get_comment_line + if !seq_comments.strip.empty? + c=seq_comments + c + end + + f=inserts[0] + + fasta_file << FastqFile.to_fastq(n,f,q,c) + + inserts_pos = seq.get_actions(ActionInsert) + + sff_file<< "#{n} #{inserts_pos[0].start_pos+1} #{inserts_pos[0].end_pos+1}" + + + end + + + def write_seq_to_files_normal(files,seq, stats) + + # puts stats.to_json + + dir_name,file_name,priority=seq.get_file_tag_path + # puts File.join(dir_name,'sequences_'+file_name) + + # get current inserts + inserts = seq.get_inserts + + # qualities are optional + if @use_qual + qual_inserts = seq.get_qual_inserts + end + + # save json if necessary + if @use_json + json_file(files)<< seq.to_json + end + + # find mids + mid = seq.get_actions(ActionMid).first + + + if (seq.seq_rejected) # save to rejected sequences + save_rejected_seq(files,seq, stats) - return res + elsif (inserts.empty?) #sequence with no inserts + save_empty_insert(files,seq, stats) + + elsif (inserts.count == 2) # sequence with two inserts = PAIRED SEQUENCES + save_two_inserts(files,seq, stats,inserts,qual_inserts,mid,dir_name,file_name) + + elsif (inserts.count == 1) # sequence with one insert + save_one_insert(files,seq, stats,inserts,qual_inserts,mid,dir_name,file_name) end - + + end + + + + + + # ACCESS TO FILES + + def json_file(files) + return get_file(files,File.join(OUTPUT_PATH,'results.json')) + end + + def rejected_output_file(files) + return get_file(files,File.join(OUTPUT_PATH,'rejected.txt')) + end + + + def sequence_file(files, dir_name, file_name) + return get_file(files,File.join(OUTPUT_PATH,dir_name,'sequences_'+file_name+'.fastq')) + end + + def paired_file(files, dir_name, file_name) + return get_file(files,File.join(OUTPUT_PATH,dir_name,'paired_'+file_name+'.fastq')) + end + + def paired_file_ilu1(files, dir_name, file_name) + return get_file(files,File.join(OUTPUT_PATH,dir_name,'paired_1_'+file_name+'.fastq')) + end + + def paired_file_ilu2(files, dir_name, file_name) + return get_file(files,File.join(OUTPUT_PATH,dir_name,'paired_2_'+file_name+'.fastq')) + end + + + def low_complexity_file(files, dir_name, file_name) + return get_file(files,File.join(OUTPUT_PATH,dir_name,'low_complexity_'+file_name+'.fastq')) + end + + def sffinfo_file(files, dir_name, file_name) + return get_file(files,File.join(OUTPUT_PATH,dir_name,'sff_info_'+file_name+'.txt')) + end + + def low_sffinfo_file(files, dir_name, file_name) + return get_file(files,File.join(OUTPUT_PATH,dir_name,'low_complexity_sff_info_'+file_name+'.txt')) + end + + def get_file(files,fn) + res=files[fn] + + if !res + files[fn]=[] + res=files[fn] + end + + return res + end + end