#!/usr/bin/env ruby # # === the rq program # # the rq program is the single command line interface by which all queue # operations are affected. it always takes, as it's first argument, the name of # the queue to be operated on. the second argument is always the mode of # operation. the action taken and meaning of subsequent arguments depends # directory on the mode of operation. for example the command # # rq queue create # # has the the mode _create_ and will create the queue _queue_. similarly the # command # # rq queue submit my_job.sh huge_input_file.dat # # runs in _submit_ mode and will sumbit a job to _queue_. # # run # # rq --help # # or see README # # for the detailed instructions for each of the operation modes # require 'rq-3.1.0' module RQ # # the Main class is responsible for parsing command line paramters and # switches, doing some validation, initializing logging, and, ultimately, # delegating the bulk of the work to a MainHelper based on the _mode_ given. # the relationship between Main and MainHelper is a tight one by design - the # primary purpose of it being to prevent the Main class from becoming 10000 # lines long. the delegators used include: # # * Creator # * Submitter # * Lister # * StatusLister # * Deleter # * Updater # * Querier # * Executor # * Configurator # * Snapshotter # * Locker # * Backer # * Rotater # * Feeder # * IOViewer # class Main #--{{{ include Util include Logging include Usage # an enumeration of option specifications used to parse command line OPTSPEC = #--{{{ [ [ '--priority=priority', '-p', 'modes : set the job(s) priority - lowest(0) .. highest(n) - (default 0)' ], [ '--tag=tag', '-t', 'modes : set the job(s) user data tag' ], [ '--runner=runner', 'modes : set the job(s) required runner(s)' ], [ '--restartable', 'modes : set the job(s) to be restartable on node reboot' ], [ '--stage', 'modes : set the job(s) initial state to be holding (default pending)' ], [ '--infile=infile', '-i', 'modes : infile' ], [ '--stdin=[stdin]', '-s', 'modes : stdin' ], [ '--quiet', '-q', 'modes : do not echo submitted jobs, fail silently if another process is already feeding' ], [ '--daemon', '-D', 'modes : spawn a daemon' ], [ '--max_feed=max_feed', 'modes : the maximum number of concurrent jobs run' ], [ '--retries=retries', 'modes : specify transaction retries' ], [ '--min_sleep=min_sleep', 'modes : specify min sleep' ], [ '--max_sleep=max_sleep', 'modes : specify max sleep' ], [ '--exit=exit_code_map', 'modes : specify and exit code map' ], [ '--fields=fields', '-f', 'limit which fields of output to display' ], [ '--snapshot', '-s', 'operate on snapshot of queue' ], [ '--editor=editor', '-e', 'editor command capable of opening multiple files at once = (default ENV["RQ_EDITOR"] || "vim -R -o")' ], [ '--verbosity=verbostiy', '-v', '0|fatal < 1|error < 2|warn < 3|info < 4|debug - (default info)' ], [ '--log=path','-l', 'set log file - (default stderr)' ], [ '--log_age=log_age', 'daily | weekly | monthly - what age will cause log rolling (default nil)' ], [ '--log_size=log_size', 'size in bytes - what size will cause log rolling (default nil)' ], [ '--dot_rq_dir=[dot_rq_dir]', 'base dir for log/pidfile storage (default ~/.rq/full/path/to/queue)' ], # [ # '--config=path', # 'valid path - specify config file (default nil)' # ], # [ # '--template=[path]', # 'valid path - generate a template config file in path (default stdout)' # ], [ '--help', '-h', 'this message' ], [ '--version', 'show version number' ], ] #--}}} # the default config file searched for has this basename CONFIG_DEFAULT_PATH = 'rq.conf' # config files are searched for using this list of locations CONFIG_SEARCH_PATH = %w( . ~ /dmsp/reference/etc /usr/local/etc /usr/etc /etc ) # the queue can be specified in the environment Q = ENV['RQ_Q'] || ENV['RQ_QUEUE'] attr :logger attr :argv attr :env attr :stdin attr :job_stdin attr :cmd attr :options attr :qpath attr :mode attr :q attr :daemon attr :quiet attr :fields attr :dot_rq_dir alias_method 'stdin?', 'stdin' alias_method 'job_stdin?', 'job_stdin' alias_method 'quiet?', 'quiet' # given a command line and environment run the rq program def initialize argv = ARGV, env = ENV #--{{{ begin @logger = Logger::new STDERR @argv = Util::mcp(argv.to_a) @env = Util::mcp(env.to_hash) @cmd = ([File::expand_path($0)] + ARGV).join(' ') @stdin = parse_stdin parse_options if(@options.has_key?('name')) $0 = ([@options['name']] + ARGV).join(' ') end if(@options.has_key?('help') or @argv.include?('help')) usage('port' => STDOUT, 'long' => true) exit EXIT_SUCCESS end if(@options.has_key?('template') or (idx = @argv.index('template'))) gen_template(@options['template'] || @argv[idx + 1]) exit EXIT_SUCCESS end if @options.has_key?('version') puts RQ::VERSION exit EXIT_SUCCESS end if(@options.has_key?('stdin')) @options['stdin'] ||= '-' @job_stdin = @options['stdin'] end if(@options.has_key?('quiet')) @quiet = true end if(@options.has_key?('fields')) @fields = @options['fields'].split(%r/,/).map{|f| f.strip} @fields.uniq! end parse_argv setup_dot_rq_dir status = run case status when Integer exit status else exit(status ? EXIT_SUCCESS : EXIT_FAILURE) end rescue => e unless SystemExit === e logerr e exit EXIT_FAILURE else exit e.status end end #--}}} end # extract command lines args def parse_argv #--{{{ @qpath = Q || @argv.shift @mode = @argv.shift #--}}} end # determine storage for logs/pidfiles def setup_dot_rq_dir #--{{{ if(@options.has_key?('dot_rq_dir')) @dot_rq_dir = @options['dot_rq_dir'] end if @dot_rq_dir.nil? home = ENV['HOME'] || File::expand_path('~') rescue abort("ENV['HOME'] is unset!") @dot_rq_dir = File::join home, '.rq', @qpath end FileUtils.mkdir_p @dot_rq_dir #--}}} end # select a MainHelper based on mode and delegate to it def run #--{{{ @qpath = Util::realpath @qpath if @mode.nil? or @mode.strip.empty? usage 'port' => STDERR, 'long' => false exit EXIT_FAILURE end shortcuts = { 'c' => 'create', 's' => 'submit', 'r' => 'resubmit', 're' => 'resubmit', 'l' => 'list', 'ls' => 'list', 't' => 'status', 'd' => 'delete', 'rm' => 'delete', 'u' => 'update', 'q' => 'query', 'e' => 'execute', 'C' => 'configure', 'S' => 'snapshot', 'L' => 'lock', 'B' => 'backup', 'R' => 'rotate', 'h' => 'help', 'H' => 'help', 'f' => 'feed', 'io' => 'ioview', '0' => 'stdin', '1' => 'stdout', '2' => 'stderr', 'to' => 'touch', 'ta' => 'tail', 'cron' => 'cron', } if((longmode = shortcuts[@mode])) @mode = longmode end begin case @mode when 'create' create when 'submit' submit when 'resubmit' resubmit when 'list' list when 'status' status when 'delete' delete when 'update' update when 'query' query when 'execute' execute when 'configure' configure when 'snapshot' snapshot when 'lock' lock when 'backup' backup when 'rotate' rotate when 'help' usage 'port' => STDOUT, 'long' => true exit EXIT_SUCCESS when 'feed' feed when 'start' start when 'shutdown' shutdown when 'stop' stop when 'restart' restart when 'pid' pid when 'feeder' feeder when 'recover' recover when 'ioview' ioview when 'stdin' dump_stdin when 'stdout' dump_stdout when 'stderr' dump_stderr when 'stdin4' stdin4 when 'stdout4' stdout4 when 'stderr4' stderr4 when 'touch' touch when 'tail' tail when 'cron' cron else raise "invalid mode <#{ @mode }>" end self rescue Errno::EPIPE => e raise if STDOUT.tty? end EXIT_SUCCESS #--}}} end # delegated to a Creator def create #--{{{ init_logging creator = Creator::new self creator.create #--}}} end # delegated to a Submitter def submit #--{{{ init_logging submitter = Submitter::new self submitter.submit #--}}} end # delegated to a ReSubmitter def resubmit #--{{{ init_logging resubmitter = ReSubmitter::new self resubmitter.resubmit #--}}} end # delegated to a Lister def list #--{{{ init_logging @options['snapshot'] = true lister = Lister::new self lister.list #--}}} end # delegated to a StatusLister def status #--{{{ init_logging @options['snapshot'] = true statuslister = StatusLister::new self statuslister.statuslist #--}}} end # delegated to a Deleter def delete #--{{{ init_logging deleter = Deleter::new self deleter.delete #--}}} end # delegated to a Updater def update #--{{{ init_logging updater = Updater::new self updater.update #--}}} end # delegated to a Querier def query #--{{{ init_logging querier = Querier::new self querier.query #--}}} end # delegated to a Executor def execute #--{{{ init_logging executor = Executor::new self executor.execute #--}}} end # delegated to a Configurator def configure #--{{{ init_logging configurator = Configurator::new self configurator.configure #--}}} end # delegated to a Snapshotter def snapshot #--{{{ init_logging snapshotter = Snapshotter::new self snapshotter.snapshot #--}}} end # delegated to a Locker def lock #--{{{ init_logging locker = Locker::new self locker.lock #--}}} end # delegated to a Backer def backup #--{{{ init_logging backer = Backer::new self backer.backup #--}}} end # delegated to a Rotater def rotate #--{{{ init_logging rotater = Rotater::new self rotater.rotate #--}}} end # delegated to a Feeder def feed #--{{{ feeder = Feeder::new self feeder.feed #--}}} end # quietly start a daemon process def start #--{{{ unless exists @options['daemon'] = true @options['quiet'] = true @options['log'] ||= File.join(@dot_rq_dir, 'log') feeder = Feeder::new self feeder.feed end #--}}} end # clean stop def shutdown #--{{{ pid = (exists and signal_feeder('TERM')) puts "pid <#{ pid }> signaled to stop a.s.a.p" if pid exit(Integer === pid ? 0 : 1) #--}}} end # hard stop def stop #--{{{ pid = (exists and signal_feeder('KILL')) puts "pid <#{ pid }> signaled to stop now" if pid exit(Integer === pid ? 0 : 1) #--}}} end # sighup based restart def restart #--{{{ pid = (exists and signal_feeder('HUP')) puts "pid <#{ pid }> signaled to restart" if pid exit(Integer === pid ? 0 : 1) #--}}} end # is a feeder running? def feeder #--{{{ arg = @argv.shift case arg when /pid/ pid else puts "---\nfeeder : #{ exists ? true : false }" end #--}}} end # pid of any running feeder def pid #--{{{ puts "---\npid : #{ exists || '~' }" #--}}} end # attempt sqlite db recovery def recover #--{{{ init_logging recoverer = Recoverer::new self recoverer.recover #--}}} end # spawn external process to view stdin/stdout/stderr of jids def ioview #--{{{ init_logging ioviewer = IOViewer::new self ioviewer.ioview #--}}} end # dump stdin for jid def dump_stdin #--{{{ dump_ios 'stdin', jids4(@argv) #--}}} end # dump stdout for jid def dump_stdout #--{{{ dump_ios 'stdout', jids4(@argv) #--}}} end # dump stderr for jid def dump_stderr #--{{{ dump_ios 'stderr', jids4(@argv) #--}}} end # dump stdin path for jid def stdin4 jids = nil #--{{{ if jids File.join @qpath, 'stdin', jids.to_s else jids = jids4 @argv #STDOUT << "---\n" jids.flatten.each do |jid| iopath = File.join @qpath, 'stdin', jid.to_s #STDOUT << " - " << iopath << "\n" puts iopath end end #--}}} end # dump stdout path for jid def stdout4 jids = nil #--{{{ if jids File.join @qpath, 'stdout', jids.to_s else jids = jids4 @argv #STDOUT << "---\n" jids.flatten.each do |jid| iopath = File.join @qpath, 'stdout', jid.to_s #STDOUT << " - " << iopath << "\n" puts iopath end end #--}}} end # dump stderr path for jid def stderr4 jids = nil #--{{{ if jids File.join @qpath, 'stderr', jids.to_s else jids = jids4 @argv #STDOUT << "---\n" jids.flatten.each do |jid| iopath = File.join @qpath, 'stderr', jid.to_s #STDOUT << " - " << iopath << "\n" puts iopath end end #--}}} end # delegated to a Toucher def touch #--{{{ init_logging toucher = Toucher::new self toucher.touch #--}}} end # spawn external process to tail stdin/stdout/stderr of jids def tail #--{{{ @options['editor'] = 'tail -F' init_logging ioviewer = IOViewer::new self ioviewer.ioview rescue nil #--}}} end # add/delete crontab entry def cron #--{{{ init_logging cron = Cron::new self cron.cron #--}}} end def dump_ios which, jids #--{{{ jids.each do |jid| iopath = send "#{ which }4", jid begin cat iopath rescue next end end #--}}} end def cat path #--{{{ system("cat #{ path } 2>/dev/null") or open(path){|f| f.each{|line| print line}} #--}}} end def jids4 *list #--{{{ jids = list.flatten.map{|elem| Integer elem} #@stdin.each{|line| line.strip!; next if line.empty?; jids << line} if @stdin if @stdin mainhelper = MainHelper.new(self) jobs = [] mainhelper.loadio @stdin, 'stdin', jobs jobs.each{|job| jids << job['jid']} jids.map!{|jid| Integer(jid) rescue abort("bad jid <#{ jid.inspect }>")} end jids #--}}} end def exists #--{{{ begin signal_feeder 0 rescue Errno::ESRCH false end #--}}} end def signal_feeder sig #--{{{ feeder = Feeder::new self pidfilepath = feeder.gen_pidfilepath pid = Integer(IO::read(pidfilepath)) rescue nil begin Process::kill(sig, pid) pid rescue nil end #--}}} end # parses '-' from cmdline, but not if it's after a '--' def parse_stdin #--{{{ dash_dash, dash = %w( -- - ).map{|d| @argv.index d} if dash if dash_dash if dash < dash_dash @argv.delete '-' STDIN end else @argv.delete '-' STDIN end end #--}}} end # uses OPTSPEC to parse command line switches def parse_options #--{{{ @op = OptionParser.new @options = {} OPTSPEC.each do |spec| k = spec.first.gsub(%r/(?:--)|(?:=.*$)|(?:\s+)/o,'') @op.def_option(*spec){|v| v = v.to_s; @options[k] = v.empty? ? nil : v} #@op.def_option(*spec){|v| @options[k] = v} end if((env_opts = (ENV['RQ_OPTS'] || ENV['RQ_OPTIONS']))) require 'shellwords' @op.parse! Shellwords.shellwords(env_opts) end @op.parse! @argv @options #--}}} end # initialize logging object - all classes then use this object def init_logging #--{{{ log, log_age, log_size, verbosity = @options.values_at 'log', 'log_age', 'log_size', 'verbosity' log_age = atoi log_age rescue nil log_size = atoi log_size rescue nil $logger = @logger = Logger::new(log || STDERR, log_age, log_size) # # hack to fix Logger sync bug # @logger.class.instance_eval do attr :logdev unless @logger.respond_to?(:logdev) end @logdev = @logger.logdev.dev @logdev.sync = true level = nil verbosity ||= 'info' verbosity = case verbosity when /^\s*(?:4|d|debug)\s*$/io level = 'Logging::DEBUG' 4 when /^\s*(?:3|i|info)\s*$/io level = 'Logging::INFO' 3 when /^\s*(?:2|w|warn)\s*$/io level = 'Logging::WARN' 2 when /^\s*(?:1|e|error)\s*$/io level = 'Logging::ERROR' 1 when /^\s*(?:0|f|fatal)\s*$/io level = 'Logging::FATAL' 0 else abort "illegal verbosity setting <#{ verbosity }>" end @logger.level = 2 - ((verbosity % 5) - 2) #debug {"logging level <#{ level }>"} @logger #--}}} end # initialize configuration file - not currenlty utilized def init_config #--{{{ @config = if @options['config'] ConfigFile::new(@options['config']) else ConfigFile::any CONFIG_DEFAULT_PATH, CONFIG_SEARCH_PATH end debug { "config.path <#{ @config.path }>" } @config #--}}} end # generate a template/sample config file which can then be edited def gen_template template #--{{{ ConfigFile::gen_template(template) self #--}}} end #--}}} end end # # run main program unless included as a library (testing purposes) # RQ::Main::new ARGV, ENV