RQ::Toucher (Class)

In: lib/rq-3.0.0/toucher.rb
Parent: MainHelper
MainHelper StatusLister Snapshotter ReSubmitter Feeder Deleter Toucher Relayer Executor Submitter Locker IOViewer Backer Cron Configurator Lister Rotater Creator Recoverer Updater Querier ::Hash ConfigFile DRbUndumped JobRunner Main QDB JobQueue JobRunnerDaemon Array SleepCycle Job ArrayFields ::OrderedHash OrderedAutoHash LogMethods Refresher ResourceManager Resource lib/rq-3.0.0/refresher.rb lib/rq-3.0.0/snapshotter.rb lib/rq-3.0.0/deleter.rb lib/rq-3.0.0/feeder.rb lib/rq-3.0.0/configurator.rb lib/rq-3.0.0/cron.rb lib/rq-3.0.0/jobqueue.rb lib/rq-3.0.0/rotater.rb lib/rq-3.0.0/backer.rb lib/rq-3.0.0/toucher.rb lib/rq-3.0.0/qdb.rb lib/rq-3.0.0/configfile.rb lib/rq-3.0.0/mainhelper.rb lib/rq-3.0.0/lister.rb bin/rq.rb lib/rq-3.0.0/statuslister.rb lib/rq-3.0.0/updater.rb lib/rq-3.0.0/jobrunner.rb lib/rq-3.0.0/job.rb lib/rq-3.0.0/creator.rb lib/rq-3.0.0/sleepcycle.rb lib/rq-3.0.0/executor.rb lib/rq-3.0.0/resubmitter.rb lib/rq-3.0.0/orderedautohash.rb lib/rq-3.0.0/resourcemanager.rb lib/rq-3.0.0/resource.rb lib/rq-3.0.0/jobrunnerdaemon.rb lib/rq-3.0.0/recoverer.rb lib/rq-3.0.0/querier.rb lib/rq-3.0.0/ioviewer.rb lib/rq-3.0.0/locker.rb lib/rq-3.0.0/submitter.rb lib/rq-3.0.0/relayer.rb Usage Util LogClassMethods LoggerExt LogMethods Logging RQ Module: RQ

nodoc

Methods

touch  

Public Instance methods

[Source]

     # File lib/rq-3.0.0/toucher.rb, line 15
 15:       def touch
 16: #--{{{
 17:         set_q
 18: 
 19:         @priority = @options['priority']
 20:         debug{ "priority <#{ @priority }>" }
 21: 
 22:         @tag = @options['tag']
 23:         debug{ "tag <#{ @tag }>" }
 24: 
 25:         @runner = @options['runner']
 26:         debug{ "runner <#{ @runner }>" }
 27: 
 28:         @restartable = @options['restartable']
 29:         debug{ "restartable <#{ @restartable }>" }
 30: 
 31:         @infile = @options['infile'] 
 32:         debug{ "infile <#{ @infile }>" }
 33: 
 34:         @job_stdin = @options['stdin'] 
 35:         debug{ "job_stdin <#{ @job_stdin }>" }
 36: 
 37:         @stage = @options['stage']
 38:         debug{ "stage <#{ @stage }>" }
 39: 
 40:         if job_stdin == '-' and stdin?
 41:           abort "cannot specify both jobs and job input on stdin"
 42:         end
 43: 
 44:         jobs = [] 
 45: 
 46:         unless @argv.empty?
 47:           job = Job::new
 48:           job['command'] = @argv.join(' ') 
 49:           job['priority'] = @priority
 50:           job['tag'] = @tag 
 51:           job['runner'] = @runner
 52:           job['restartable'] = @restartable
 53:           jobs << job
 54:         end
 55: 
 56:         if @infile
 57:           open(@infile) do |f|
 58:             debug{ "reading jobs from <#{ @infile }>" }
 59:             loadio f, @infile, jobs 
 60:           end
 61:         end
 62: 
 63:         if jobs.empty? and stdin? 
 64:           debug{ "reading jobs from <stdin>" }
 65:           loadio stdin, 'stdin', jobs 
 66:         end
 67: 
 68:         abort "no jobs specified!" if jobs.empty?
 69: 
 70:         init_job_stdin!
 71:         
 72:         state = @stage ? 'holding' : 'pending'
 73: 
 74:         jobs.each do |job|
 75:           job['state'] = state
 76:           job['priority'] = @priority if @options.has_key?('priority')
 77:           job['tag'] = @tag if @options.has_key?('tag')
 78:           job['runner'] = @runner if @options.has_key?('runner')
 79:           job['restartable'] = @restartable if @options.has_key?('restartable')
 80:           job['stdin'] = @job_stdin if @job_stdin
 81:         end
 82: 
 83:       #
 84:       # state + lambdas for submit process...
 85:       #
 86: 
 87:         list = []
 88: 
 89:         tmpfile =
 90:           lambda do |basename|
 91:             basename = File.basename basename.to_s
 92:             Tempfile.new "#{ basename }_#{ Process.pid }_#{ rand.to_s }"
 93:           end
 94: 
 95:         update_job = 
 96:           lambda do |pjob, ujob|
 97:             kvs, jid = {}, pjob['jid']
 98:           # handle stdin
 99:             pstdin, ustdin = pjob['stdin'], ujob['stdin']
100:             if pstdin || ustdin
101:               pbuf =
102:                 if pstdin
103:                   pstdin = @q.standard_in_4 jid
104:                   IO.read pstdin if test ?e, pstdin
105:                 end
106:               ubuf =
107:                 if ustdin
108:                   IO.read ustdin if test ?e, ustdin
109:                 end
110: #y 'pbuf' => pbuf
111: #y 'ubuf' => ubuf
112:               f = ustdin ? open(ustdin,'w') : tmpfile[ustdin]
113:               begin
114:                 f.write pbuf if pbuf
115:                 f.write ubuf if pbuf
116:               ensure
117:                 f.close
118:               end
119:               kvs['stdin'] = ujob['stdin'] = f.path
120: #y 'stdin' => ujob['stdin']
121:             end
122:           # handle other keys
123:             allowed = %( priority runner restartable )
124:             allowed.each do |key|
125:               val = ujob[key]
126:               kvs[key] = val if val
127:             end
128:             @q.update(kvs, jid){|updated| list << updated}
129:           end
130: 
131:         submit_job = 
132:           lambda do |job|
133:             @q.submit(job){|submitted| list << submitted}
134:           end
135: 
136: 
137:       #
138:       # update or submit
139:       #
140:         @q.transaction do
141:           pending = @q.list 'pending'
142: 
143:           pjobs, pcommands = {}, {}
144: 
145:           pending.each do |job|
146:             jid = job['jid']
147:             command = job['command'].strip
148:             tag = job['tag'].to_s.strip
149:             pjobs[jid] = job
150:             pcommands[[command, tag]] = jid
151:           end
152: 
153:           jobs.each do |job|
154:             jid = job['jid']
155:             command = job['command'].strip
156:             tag = job['tag'].to_s.strip
157:             if((jid = pcommands[[command, tag]]))
158:               update_job[ pjobs[jid], job ] 
159:             else
160:               submit_job[ job ]
161:             end
162:           end
163:         end
164: 
165:         list.each &dumping_yaml_tuples unless @options['quiet']
166:     
167:         jobs = nil
168:         list = nil
169:         self
170: #--}}}
171:       end

[Validate]