require 'pbs'
# == Helper object: ruby interface to torque shell commands
# in the same vein as stdlib's Shell which
# "implements an idiomatic Ruby interface for common UNIX shell commands"
# also helps to have these separate so we can use a mock shell for unit tests
#
# == FIXME: This contains no state whatsoever. It should probably be changed into a module.
class OSC::Machete::TorqueHelper
# Alias to initialize a new object.
def self.default
self::new()
end
# Returns an OSC::Machete::Status ValueObject for a char
#
# @param [String] char The Torque status char
#
# @example Completed
# status_for_char("C") #=> OSC::Machete::Status.completed
# @example Queued
# status_for_char("W") #=> OSC::Machete::Status.queued
#
# @return [OSC::Machete::Status] The status corresponding to the char
def status_for_char(char)
case char
when "C", nil
OSC::Machete::Status.passed
when "Q", "T", "W" # T W happen before job starts
OSC::Machete::Status.queued
when "H"
OSC::Machete::Status.held
else
# all other statuses considerd "running" state
# including S, E, etc.
# see http://docs.adaptivecomputing.com/torque/4-1-3/Content/topics/commands/qstat.htm
OSC::Machete::Status.running
end
end
#*TODO:*
# consider using cocaine gem
# consider using Shellwords and other tools
# usage: qsub("/path/to/script") or
# qsub("/path/to/script", depends_on: { afterany: ["1234.oak-batch.osc.edu"] })
#
# Where depends_on is a hash with key being dependency type and array containing the
# arguments. See documentation on dependency_list in qsub man pages for details.
#
# Bills against the project specified by the primary group of the user.
def qsub(script, host: nil, depends_on: {}, account_string: nil)
# if the script is set to run on Oakley in PBS headers
# this is to obviate current torque filter defect in which
# a script with PBS header set to specify oak-batch ends
# isn't properly handled and the job gets limited to 4GB
pbs_job = get_pbs_job( host.nil? ? get_pbs_conn(script: script) : get_pbs_conn(host: host) )
headers = { depend: qsub_dependencies_header(depends_on) }
headers.clear if headers[:depend].empty?
# currently we set the billable project to the name of the primary group
# this will probably be both SUPERCOMPUTER CENTER SPECIFIC and must change
# when we want to enable our users at OSC to specify which billable project
# to bill against
if account_string
headers[PBS::ATTR[:A]] = account_string
elsif account_string_valid_project?(default_account_string)
headers[PBS::ATTR[:A]] = default_account_string
end
pbs_job.submit(file: script, headers: headers, qsub: true).id
end
# convert dependencies hash to a PBS header string
def qsub_dependencies_header(depends_on = {})
depends_on.map { |x|
x.first.to_s + ":" + Array(x.last).join(":") unless Array(x.last).empty?
}.compact.join(",")
end
# return the account string required for accounting purposes
# having this in a separate method is useful for monkeypatching in short term
# or overridding with a subclass you pass into OSC::Machete::Job
#
# FIXME: this may belong on OSC::Machete::User; but it is OSC specific...
#
# @return [String] the project name that job submission should be billed against
def default_account_string
OSC::Machete::Process.new.groupname
end
def account_string_valid_project?(account_string)
/^P./ =~ account_string
end
# Performs a qstat request on a single job.
#
# **FIXME: this might not belong here!**
#
# @param [String] pbsid The pbsid of the job to inspect.
#
# @return [Status] The job state
def qstat(pbsid, host: nil)
# Create a PBS::Job object based on the pbsid or the optional host param
pbs_conn = host.nil? ? get_pbs_conn(pbsid: pbsid.to_s) : get_pbs_conn(host: host)
pbs_job = get_pbs_job(pbs_conn, pbsid)
job_status = pbs_job.status
# Get the status char value from the job.
status_for_char job_status[:attribs][:job_state][0]
rescue PBS::UnkjobidError => err
OSC::Machete::Status.passed
end
# Perform a qdel command on a single job.
#
# @param [String] pbsid The pbsid of the job to be deleted.
#
# @return [nil]
def qdel(pbsid, host: nil)
pbs_conn = host.nil? ? get_pbs_conn(pbsid: pbsid.to_s) : get_pbs_conn(host: host)
pbs_job = get_pbs_job(pbs_conn, pbsid.to_s)
pbs_job.delete
rescue PBS::UnkjobidError => err
# Common use case where trying to delete a job that is no longer in the system.
end
private
# Factory to return a PBS::Job object
def get_pbs_job(conn, pbsid=nil)
pbsid.nil? ? PBS::Job.new(conn: conn) : PBS::Job.new(conn: conn, id: pbsid.to_s)
end
# Returns a PBS connection object
#
# @option [:script] A PBS script with headers as string
# @option [:pbsid] A valid pbsid as string
#
# @return [PBS::Conn] A connection option for the PBS host (Default: Oakley)
def get_pbs_conn(options={})
if options[:script]
PBS::Conn.batch(host_from_script_pbs_header(options[:script]))
elsif options[:pbsid]
PBS::Conn.batch(host_from_pbsid(options[:pbsid]))
elsif options[:host]
PBS::Conn.batch(options[:host])
else
PBS::Conn.batch("oakley")
end
end
# return the name of the host to use based on the pbs header
# TODO: Think of a more efficient way to do this.
def host_from_script_pbs_header(script)
if (File.open(script) { |f| f.read =~ /#PBS -q @oak-batch/ })
"oakley"
elsif (File.open(script) { |f| f.read =~ /#PBS -q @opt-batch/ })
"glenn"
elsif (File.open(script) { |f| f.read =~ /#PBS -q @ruby-batch/ })
"ruby"
elsif (File.open(script) { |f| f.read =~ /#PBS -q @quick-batch/ })
"quick"
else
"oakley" # DEFAULT
end
end
# Return the PBS host string based on a full pbsid string
def host_from_pbsid(pbsid)
if (pbsid =~ /oak-batch/ )
"oakley"
elsif (pbsid =~ /opt-batch/ )
"glenn"
elsif (pbsid.to_s =~ /^\d+$/ )
"ruby"
elsif (pbsid =~ /quick/ )
"quick"
else
"oakley" # DEFAULT
end
end
end