require 'nera_db_folders' require 'nera_job_records' require 'nera_job_script' require 'fileutils' require 'yaml' require 'rubygems' require 'net/ssh' require 'net/sftp' module NERA # A class for remote connection class RemoteConnector # instance of NERA::DbFolders @db_folders # instance of NERA::JobRecords @job_records # hosts (Array of Hashes) @hosts public def initialize( path_db_folder) raise ArgumentError unless path_db_folder.is_a?(String) @db_folders = NERA::DbFolders.new( path_db_folder) if File.exist?( @db_folders.path_to_hosts_yaml) File.open( @db_folders.path_to_hosts_yaml, 'r') do |io| @hosts = YAML.load( io) end else @hosts = [] end raise "Invalid format of hosts.yml" unless valid_hosts? @job_records = NERA::JobRecords.new( @db_folders.path_to_jobs_table) end private def valid_hosts? @hosts.each do |host| return false unless host.has_key?(:name) return false unless host[:name].is_a?(String) return false unless host.has_key?(:host_url) return false unless host[:host_url].is_a?(String) return false unless host.has_key?(:user) return false unless host[:user].is_a?(String) host[:job_path] = '~/' unless host.has_key?(:job_path) host[:port] = 22 unless host.has_key?(:port) end end public def hostnames @hosts.map do |host| host[:name] end end def submittable_hostnames arr = [] @hosts.each do |host| arr << host[:name] if host[:submission_command] end return arr end def show_status( hostname) found = @hosts.find do |host| host[:name] == hostname end return :connection_failed unless found h = { :port => found[:port], :timeout => 10 } h[:keys] = found[:keys] if found.has_key?(:keys) cmd = 'ps au' cmd = found[:show_status_command] if found.has_key?(:show_status_command) str = '' begin $stderr.print "Connecting with #{found[:name]}.....\t" Net::SSH.start( found[:host_url], found[:user], h) do |ssh| str = ssh.exec!( cmd) end rescue Errno::ECONNREFUSED $stderr.puts "failed" return :connection_failed rescue Errno::ENETUNREACH $stderr.puts "failed" return :connection_failed rescue Net::SSH::Exception $stderr.puts "failed" return :connection_failed rescue SocketError $stderr.puts "failed" return :connection_failed end $stderr.puts "done" return str end def transfer( jobids, hostname) raise ArgumentError unless jobids.is_a?(Array) jobids.each do |jid| raise ArgumentError unless jid.is_a?(Integer) end raise ArgumentError unless hostname.is_a?(String) host = @hosts.find do |h| h[:name] == hostname end return :connection_failed unless host @job_records.transaction { jids = [] jobids.each do |jid| jids << jid if @job_records.find_by_id(jid) end return :no_such_jobs if jids.size == 0 h = { :port => host[:port] } h[:keys] = host[:keys] if host.has_key?(:keys) begin $stderr.print "Connecting with #{host[:name]}.....\t" Net::SSH.start( host[:host_url], host[:user], h) do |ssh| ssh.exec("mkdir -p #{host[:job_path]}") target_home = ssh.exec!('echo $HOME').chomp target_dir = host[:job_path].sub(/^~/,target_home) ssh.sftp.connect do |sftp| jids.each do |jid| pjs = @db_folders.path_to_job_script(jid) target_path = target_dir + '/' + File.basename(pjs) next unless File.exist?( pjs) contents = '' File.open( pjs, 'r') do |io| contents = io.readlines end contents[0] += "cd #{target_dir}\n" sftp.file.open(target_path, "w") do |f| f.puts contents end ssh.exec!("chmod +x #{target_path}") @job_records.update_to_state_copied( jid, host[:name]) end end end rescue Errno::ECONNREFUSED $stderr.puts "failed" return :connection_failed rescue Errno::ENETUNREACH $stderr.puts "failed" return :connection_failed rescue SocketError $stderr.puts "failed" return :connection_failed rescue Net::SSH::Exception $stderr.puts "failed" return :connection_failed end $stderr.puts "done" } return :succeeded end private def convert_shell_script end public def submit( jobids, hostname) raise ArgumentError unless jobids.is_a?(Array) jobids.each do |jid| raise ArgumentError unless jid.is_a?(Integer) end raise ArgumentError unless hostname.is_a?(String) host = @hosts.find do |h| h[:name] == hostname end return :connection_failed unless host return :no_submission_command unless host.has_key?(:submission_command) @job_records.transaction { jids = [] jobids.each do |jid| jids << jid if @job_records.find_by_id(jid) end return :no_such_jobs if jids.size == 0 h = { :port => host[:port] } h[:keys] = host[:keys] if host.has_key?(:keys) begin $stderr.print "Connecting with #{host[:name]}.....\t" Net::SSH.start( host[:host_url], host[:user], h) do |ssh| ssh.exec("mkdir -p #{host[:job_path]}") target_home = ssh.exec!('echo $HOME').chomp target_dir = host[:job_path].sub(/^~/,target_home) ssh.sftp.connect do |sftp| jids.each do |jid| pjs = @db_folders.path_to_job_script(jid) next unless File.exist?( pjs) target_path = target_dir + '/' + File.basename(pjs) contents = '' File.open( pjs, 'r') do |io| contents = io.readlines end contents[0] += "cd #{target_dir}\n" sftp.file.open(target_path, "w") do |f| f.puts contents end ssh.exec!("chmod +x #{target_path}") @job_records.update_to_state_copied( jid, host[:name]) ssh.exec!("cd #{target_dir} && #{host[:submission_command]} #{target_path} &") @job_records.update_to_state_submitted( jid, host[:name]) end end end rescue Errno::ECONNREFUSED $stderr.puts "failed" return :connection_failed rescue Errno::ENETUNREACH $stderr.puts "failed" return :connection_failed rescue SocketError $stderr.puts "failed" return :connection_failed rescue Net::SSH::Exception $stderr.puts "failed" return :connection_failed end $stderr.puts "done" } return :succeeded end def check_completion( hostname) raise ArgumentError unless hostname.is_a?(String) host = @hosts.find do |h| h[:name] == hostname end return :connection_failed unless host h = { :port => host[:port] } h[:keys] = host[:keys] if host.has_key?(:keys) ls_str = '' begin $stderr.print "Connecting with #{host[:name]}.....\t" Net::SSH.start( host[:host_url], host[:user], h) do |ssh| ls_str = ssh.exec!("ls #{host[:job_path]}") end rescue Errno::ECONNREFUSED $stderr.puts "failed" return :connection_failed rescue Errno::ENETUNREACH $stderr.puts "failed" return :connection_failed rescue SocketError $stderr.puts "failed" return :connection_failed rescue Net::SSH::Exception $stderr.puts "failed" return :connection_failed end $stderr.puts "done" return finished_list( ls_str) end private def finished_list( ls_str) ls_list = ls_str.to_s.split("\n") return ls_list.find_all do |fname| flag = false if fname =~ /^(j[0-9]+).tar.bz2$/ flag = true unless ls_list.find do |f| f == $1 end end flag end end public def check_completion_all h = {} hostnames.each do |hname| h[hname] = check_completion(hname) end return h end def download( filenames, hostname) raise ArgumentError unless filenames.is_a?(Array) filenames.each do |fname| raise ArgumentError unless fname.is_a?(String) raise ArgumentError unless fname =~ /.tar.bz2$/ end raise ArgumentError unless hostname.is_a?(String) host = @hosts.find do |h| h[:name] == hostname end return :connection_failed unless host h = { :port => host[:port] } h[:keys] = host[:keys] if host.has_key?(:keys) downloaded = [] begin $stderr.puts "Connecting with #{host[:name]}.....\t" Net::SSH.start( host[:host_url], host[:user], h) do |ssh| target_home = ssh.exec!('echo $HOME').chomp target_dir = host[:job_path].sub(/^~/,target_home) ls_str = ssh.exec!("ls #{target_dir}") d_list = filenames.find_all do |fname| ls_str.include?( fname) end ssh.sftp.connect do |sftp| d_list.each do |d_file| #download $stderr.print " downloading #{d_file}.....\t" sftp.download!( target_dir + '/' + d_file, @db_folders.path_to_include_layer + d_file) $stderr.puts "done" downloaded << d_file #delete data file and job script $stderr.print " deleting #{d_file}.....\t" sftp.remove!(target_dir + '/' + d_file) $stderr.puts "done" sh_file = d_file.sub(/tar.bz2$/,'sh') $stderr.print " deleting #{sh_file}.....\t" sftp.remove!(target_dir + '/' + sh_file) $stderr.puts "done" end end end rescue Errno::ECONNREFUSED $stderr.puts "failed" return :connection_failed rescue Errno::ENETUNREACH $stderr.puts "failed" return :connection_failed rescue SocketError $stderr.puts "failed" return :connection_failed rescue Net::SSH::Exception $stderr.puts "failed" return :connection_failed end $stderr.puts "done" return downloaded end end end