# # The command line interface class # require 'rubygems/test_utilities' require 'mabmapper/elasticsearch_writer' require 'mabmapper/tar_writer' module Mabmapper class Cli ROOT_DIR = Dir.pwd def initialize @options = {} parse_command_line! load_engine! process_files! end protected # # Parse command line options # def parse_command_line! optparse = OptionParser.new do |opts| opts.banner = "Usage: mabmapper [options] FILES" @options[:output_dir] = nil opts.on( '-o', '--output DIR', 'Output directory' ) do |dir| @options[:output_dir] = dir end @options[:debug] = false opts.on( '-d', '--debug', "Debug mode on." ) do @options[:debug] = true end @options[:debug_fields] = [] opts.on( '-f', '--debug-fields a,b,c', Array, "If debug mode is on only fields matching the given names will be debugged." ) do |fields| @options[:debug_fields] = fields end @options[:silent] = false opts.on( '-s', '--silent', "Do not output anything on the console" ) do @options[:silent] = true end #@options[:engine] = "mabmapper/aleph_mab_xml" #opts.on( '-e', '--engine ENGINE', 'Normalization engine (Default: aleph_mab_xml)' ) do |engine| # @options[:engine] = engine #end @options[:no_of_procs] = 1 opts.on( '-n', '--number-of-procs NUM', Integer, "Use NUM parallel procs [Default: 1]" ) do |n| @options[:no_of_procs] = n end @options[:writer] = Mabmapper::TarWriter opts.on( '-w', '--writer WRITER', "Use specified writer (elasticsearch|tar) [Default: tar]" ) do |writer| @options[:writer] = Mabmapper::ElasticSearchWriter if writer.downcase == 'elasticsearch' end opts.on( '-h', '--help', 'Display this screen' ) do puts opts exit end end optparse.parse! # Check for required file arguments (puts optparse.help ; exit) unless ARGV.present? rescue OptionParser::ParseError => e puts e.message (puts optparse.help ; exit) end # # Load normalization engine # def load_engine! begin engine_file = "mabmapper/aleph_mab_xml_engine" # TODO: Make me configurable require engine_file engine_class_name = "#{engine_file}".classify @engine = engine_class_name.constantize.new log "#{engine_class_name} loaded!" rescue LoadError log "Error loading engine #{engine_file}." exit 1 end end # # Process the input files # def process_files! max_processes = @options[:no_of_procs] # Prepare joblists for each process joblist = max_processes.times.map do |i| step = (ARGV.size/max_processes.to_f).ceil ARGV.slice(i * step, step) end.compact # Run joblist joblist.each_with_index do |list, index| fork do Thread.current[:name] = "Process #{index}" list.each do |file| process_file(file) end end end Process.waitall log "FINISHED" end private def process_file(file) case when file.end_with?('.tar') then process_tar_file(file) when file.end_with?('.tar.gz') then process_tar_gz_file(file) else process_default_file(file) end end def process_tar_file(file) writer = if output_dir out_file = @options[:writer].out_file(output_dir, File.basename(file)) @options[:writer].new(File.open(out_file, 'w')) end tarReader = Gem::Package::TarReader.new(File.open(file, 'r')) tarReader.each do |entry| if entry.file? log "Processing file #{entry.full_name} from archive #{file}" result = @engine.process(entry.full_name, entry.read, archive: file) writer.add_file(entry.full_name, 0644) do |f| f.write(result.to_xml) end if writer log "Result for #{entry.full_name} from archive #{file}\n#{result.to_xml(@options[:debug_fields])}\n" if @options[:debug] end end writer.close if writer end def process_tar_gz_file(file) if output_dir out_file = @options[:writer].out_file(output_dir, File.basename(file), will_be_gziped: true) fileIO = File.open(out_file, "w") gzipWriter = Zlib::GzipWriter.new(fileIO) writer = @options[:writer].new(gzipWriter) end begin tarReader = Gem::Package::TarReader.new(Zlib::GzipReader.open(file)) tarReader.each do |entry| if entry.file? log "Processing file #{entry.full_name} from archive #{file}" result = @engine.process(entry.full_name, entry.read, archive: file) xml_result = result.to_xml writer.add_file_simple(entry.full_name, 0644, xml_result.bytesize) do |f| f.write(xml_result) end if writer log "Result for #{entry.full_name} from archive #{file}\n#{result.to_xml(@options[:debug_fields])}\n" if @options[:debug] end end ensure writer.close if writer gzipWriter.close if gzipWriter && !gzipWriter.closed? end end def process_default_file(file) log "Processing file #{file}" result = @engine.process(file, File.open(file, "r").read) if output_dir out_file = File.join(output_dir, File.basename(file)) File.open(out_file, 'w') { |f| f.write(result.to_xml) } end log "Result for #{file}\n#{result.to_xml(@options[:debug_fields])}\n" if @options[:debug] end def output_dir if @options[:output_dir] dir = File.expand_path(@options[:output_dir]) raise "No such dir #{dir}" unless Dir.exists?(dir) return dir end end def log(message) unless @options[:silent] current_thread_name = Thread.current[:name] current_thread_indicator = "[#{current_thread_name}]" if current_thread_name.present? puts "#{current_thread_indicator} #{message}" else puts "#{message}" end end end end end