###################################### # Author: Almudena Bocinos Rioboo # This is the main class. ###################################### require 'extract_stats' # $: << File.expand_path('~/progs/ruby/gems/scbi_drb/lib') require 'scbi_mapreduce' require 'seqtrim_work_manager' require 'action_manager' # SEQTRIM_VERSION_REVISION=27 # SEQTRIM_VERSION_STAGE = 'b' # $SEQTRIM_VERSION = "2.0.0#{SEQTRIM_VERSION_STAGE}#{SEQTRIM_VERSION_REVISION}" class Seqtrim # First of all, reads the file's parameters, where are the values of all parameters and the 'plugin_list' that specifies the order of execution from the plugins. # # Secondly, loads the plugins in a folder . # # Thirdly, checks if parameter's file have the number of parameters necessary for every plugin that is going to be executed. # # After that, creates a thread's pool of a determinate number of workers, e.g. 10 threads, # reads the sequences from files 'fasta' , until now without qualities, # and executes the plugins over the sequences in the pool of threads def get_cd_hit_cmd(cd_hit_input_file,workers,init_file_path) num_cpus_cdhit=1 cmd='' # if workers is an integer, reduce it by one in the server begin Integer(workers) num_cpus_cdhit = workers cmd = "cd-hit-454 -i #{cd_hit_input_file} -o clusters.fasta -M #{num_cpus_cdhit*1000} -T #{num_cpus_cdhit} > cd-hit-454.out" rescue Exception => exception #not an integer, send via ssh to other machine # puts exception worker_hash={};workers.map{|e| worker_hash[e] = (worker_hash[e]||0) +1} max_worker = worker_hash.sort_by{|k,v| -v}.first puts "Found these workers: #{worker_hash.sort_by{|k,v| -v}}" num_cpus_cdhit=max_worker[1] init='' cd='' cmd = "cd-hit-454 -i #{cd_hit_input_file} -o clusters.fasta -M #{num_cpus_cdhit*1000} -T #{num_cpus_cdhit} > cd-hit-454.out" # worker is different to current machine, send over ssh if max_worker[0]!= workers[0] if File.exists?(init_file_path) init=". #{init_file_path}; " end pwd=`pwd`.chomp cd ='' if File.exists?(pwd) cd = "cd #{pwd}; " end cmd = "ssh #{max_worker[0]} \"#{init} #{cd} #{cmd}\"" end end return cmd end def check_global_params(params) errors=[] # check plugin list comment='Plugins applied to every sequence, separated by commas. Order is important' # default_value='PluginLowHighSize,PluginMids,PluginIndeterminations,PluginAbAdapters,PluginContaminants,PluginLinker,PluginVectors,PluginLowQuality' # params.check_param(errors,'plugin_list','String',default_value,comment) params.check_param(errors,'plugin_list','PluginList',nil,comment) comment='Should SeqTrimNext analysis be based on NGS? (if setting to false, a classic Sanger sequencing is considered)' default_value='true' params.check_param(errors,'next_generation_sequences','String',default_value,comment) comment='Remove duplicated (clonal) sequences (using CD-HIT 454)' default_value='true' params.check_param(errors,'remove_clonality','String',default_value,comment) comment='Minimum insert size for every trimmed sequence' default_value = 40 params.check_param(errors,'min_insert_size_trimmed','Integer',default_value,comment) comment='Minimum insert size for each end of paired-end reads; true paired-ends have both single-ends longer than this value' default_value = 40 params.check_param(errors,'min_insert_size_paired','Integer',default_value,comment) comment='Do not reject unexpectedly long sequences found in the raw data' default_value='true' params.check_param(errors,'accept_very_long_sequences','String',default_value,comment) comment='Seqtrim version' default_value=Seqtrimnext::SEQTRIM_VERSION params.check_param(errors,'seqtrim_version','String',default_value,comment) if !errors.empty? $LOG.error 'Please, define the following global parameters in params file:' errors.each do |error| $LOG.error ' -' + error end #end each end #end if return errors.empty? end def initialize(options) # ,options[:fasta],options[:qual],,,, params_path=options[:template] ip=options[:server_ip] port=options[:port] workers=options[:workers] only_workers=options[:only_workers] chunk_size = options[:chunk_size] use_json = options[:json] # it is the server part if !only_workers then sequence_reader = nil cd_hit_input_file = nil # TODO - FIX seqtrim to not iterate two times over input, so STDIN can be used # open sequence reader and expand input files paths if options[:fastq] if options[:fastq]=='-' seqs_path = STDIN else seqs_path = File.expand_path(options[:fastq]) end cd_hit_input_file = seqs_path sequence_reader = FastqFile.new(seqs_path,'r',:sanger, true) # cd_hit_input_file = 'cd-hit-input.fasta' cd_hit_input_file = seqs_path # $LOG.info "Converting input file for cd-hit-454" # $LOG.info "Conversion done" else seqs_path = File.expand_path(options[:fasta]) cd_hit_input_file = seqs_path qual_path = File.expand_path(options[:qual]) if qual_path sequence_reader = FastaQualFile.new(options[:fasta],options[:qual],true) end $LOG.info "Loading params" # Reads the parameter's file params = Params.new(params_path) $LOG.info "Checking global params" if !check_global_params(params) exit end # Load actions $LOG.info "Loading actions" action_manager = ActionManager.new() # load plugins plugin_list = params.get_param('plugin_list') # puts in plugin_list the plugins's array $LOG.info "Loading plugins [#{plugin_list}]" plugin_manager = PluginManager.new(plugin_list,params) # creates an instance from PluginManager. This must storage the plugins and load it # load plugin params $LOG.info "Check plugin params" if !plugin_manager.check_plugins_params(params) then $LOG.error "Plugin check failed" # save used params to file params.save_file('used_params.txt') exit end if !Dir.exists?(OUTPUT_PATH) Dir.mkdir(OUTPUT_PATH) end $LOG.info "Calculatings stats" # Extract global stats ExtractStats.new(sequence_reader,params) # save used params to file params.save_file(File.join(OUTPUT_PATH,'used_params.txt')) piro_on = (params.get_param('next_generation_sequences')=='true') # format blast database with truncated file #MakeBlastDb.format_db(es.truncated_file_path,File.basename(es.truncated_file_path,File.extname(es.truncated_file_path)),'./') if piro_on # leer mids params.load_mids(File.join($FORMATTED_DB_PATH,'mids.fasta')) params.load_linkers(File.join($FORMATTED_DB_PATH,'linkers.fasta')) #execute cd-hit if params.get_param('remove_clonality')=='true' cmd=get_cd_hit_cmd(cd_hit_input_file,workers,File.join($SEQTRIM_PATH,'init_env')) $LOG.info "Executing cd-hit-454: #{cmd}" if !File.exists?('clusters.fasta.clstr') system(cmd) end if File.exists?('clusters.fasta.clstr') params.load_repeated_seqs('clusters.fasta.clstr') else $LOG.error("Exiting due to not found clusters.fasta.clstr. Maybe cd-hit failed. Check cd-hit.out") exit end end ############ SCBI DRB ########### # port = 50000 # ip = "10.250.255.6" # port = 50000 # ip = "localhost" # # workers=20 # only_workers=false # launch work manager end # end only_workers custom_worker_file = File.join(File.dirname(__FILE__), 'em_classes','seqtrim_worker.rb') $LOG.info "Workers:\n#{workers}" if only_workers then worker_launcher = ScbiMapreduce::WorkerLauncher.new(ip,port, workers, custom_worker_file, STDOUT) worker_launcher.launch_workers_and_wait else $LOG.info 'Starting server' SeqtrimWorkManager.init_work_manager(sequence_reader, params,chunk_size,use_json) begin cpus=1 if RUBY_PLATFORM.downcase.include?("darwin") cpus=`hwprefs -cpu_count`.chomp.to_i else cpus=`grep processor /proc/cpuinfo |wc -l`.chomp.to_i end rescue cpus=1 end # if workers is an integer, reduce it by one (because of the server) begin Integer(workers) if workers>1 && workers1 && workers.count