RQ::Feeder (Class)

In: lib/rq-3.0.0/feeder.rb
Parent: MainHelper
MainHelper StatusLister Snapshotter ReSubmitter Feeder Deleter Toucher Relayer Executor Submitter Locker IOViewer Backer Cron Configurator Lister Rotater Creator Recoverer Updater Querier ::Hash ConfigFile DRbUndumped JobRunner Main QDB JobQueue JobRunnerDaemon Array SleepCycle Job ArrayFields ::OrderedHash OrderedAutoHash LogMethods Refresher ResourceManager Resource lib/rq-3.0.0/refresher.rb lib/rq-3.0.0/snapshotter.rb lib/rq-3.0.0/deleter.rb lib/rq-3.0.0/feeder.rb lib/rq-3.0.0/configurator.rb lib/rq-3.0.0/cron.rb lib/rq-3.0.0/jobqueue.rb lib/rq-3.0.0/rotater.rb lib/rq-3.0.0/backer.rb lib/rq-3.0.0/toucher.rb lib/rq-3.0.0/qdb.rb lib/rq-3.0.0/configfile.rb lib/rq-3.0.0/mainhelper.rb lib/rq-3.0.0/lister.rb bin/rq.rb lib/rq-3.0.0/statuslister.rb lib/rq-3.0.0/updater.rb lib/rq-3.0.0/jobrunner.rb lib/rq-3.0.0/job.rb lib/rq-3.0.0/creator.rb lib/rq-3.0.0/sleepcycle.rb lib/rq-3.0.0/executor.rb lib/rq-3.0.0/resubmitter.rb lib/rq-3.0.0/orderedautohash.rb lib/rq-3.0.0/resourcemanager.rb lib/rq-3.0.0/resource.rb lib/rq-3.0.0/jobrunnerdaemon.rb lib/rq-3.0.0/recoverer.rb lib/rq-3.0.0/querier.rb lib/rq-3.0.0/ioviewer.rb lib/rq-3.0.0/locker.rb lib/rq-3.0.0/submitter.rb lib/rq-3.0.0/relayer.rb Usage Util LogClassMethods LoggerExt LogMethods Logging RQ Module: RQ

the Feeder class is responsible for running jobs from a queue - or ‘feeding’ from that queue. the mode of operation is essentially to run jobs as quickly as possible, return them to the queue, and then to run more jobs if any exist. if no jobs exist the Feeder will periodically poll the queue to see if any new jobs have arrived.

Constants

DEFAULT_MIN_SLEEP = 42
DEFAULT_MAX_SLEEP = 240
DEFAULT_FEED = 2

Attributes

feed  [RW] 
max_sleep  [RW] 
min_sleep  [RW] 

Public Instance methods

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 504
504:       def busy?
505: #--{{{
506:         @children.size >= @max_feed
507: #--}}}
508:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 98
 98:       def daemon
 99: #--{{{
100:         if @options['daemon']
101:           fork do
102:             Process::setsid
103:             pid =
104:               fork do
105:                 Dir::chdir(Util.realpath('~'))
106:                 File::umask 0
107:                 @daemon = true
108:                 yield
109:                 exit EXIT_SUCCESS
110:               end
111:             # STDOUT.puts "#{ pid }"
112:             exit!
113:           end
114:           exit!
115:         else
116:           @daemon = false 
117:           yield
118:           exit EXIT_SUCCESS
119:         end
120: #--}}}
121:       end

[Source]

    # File lib/rq-3.0.0/feeder.rb, line 41
41:       def feed
42: #--{{{
43:         daemon do
44:           gen_pidfile
45:           @main.init_logging
46:           @logger = @main.logger
47:           set_q
48: 
49:           @pid = Process::pid
50:           @cmd = @main.cmd 
51:           @started = Util::timestamp
52:           @min_sleep = Integer(@options['min_sleep'] || defval('min_sleep'))
53:           @max_sleep = Integer(@options['max_sleep'] || defval('max_sleep'))
54:           @max_feed = Integer(@options['max_feed'] || defval('feed'))
55:           @children = Hash::new 
56:           @jrd = JobRunnerDaemon::daemon @q
57: 
58:           install_signal_handlers
59: 
60:           if @daemon and not @quiet
61:             STDOUT.puts "pid <#{ Process::pid }> started"
62:             STDOUT.flush
63:           end
64: 
65:           install_redirects
66: 
67:           info{ "** STARTED **" }
68:           info{ "version <#{ RQ::VERSION }>" }
69:           info{ "cmd <#{ @cmd }>" }
70:           info{ "pid <#{ @pid }>" }
71:           info{ "pidfile <#{ @pidfile.path }>" }
72:           info{ "jobrunnerdaemon uri <#{ @jrd.uri }> pid <#{ @jrd.pid }>" }
73:           info{ "qpath <#{ @qpath }>" }
74:           debug{ "mode <#{ @mode }>" }
75:           debug{ "max_feed <#{ @max_feed }>" }
76:           debug{ "min_sleep <#{ @min_sleep }>" }
77:           debug{ "max_sleep <#{ @max_sleep }>" }
78: 
79:           transaction do
80:             fill_morgue
81:             reap_zombie_ios
82:           end
83: 
84:           loop do
85:             handle_signal if $rq_signaled
86:             throttle(@min_sleep) do
87:               start_jobs unless busy?
88:               if nothing_running?
89:                 relax
90:               else
91:                 reap_jobs
92:               end
93:             end
94:           end
95:         end
96: #--}}}
97:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 277
277:       def fill_morgue
278: #--{{{
279:         debug{ "filling morgue..." }
280:         transaction do
281:           deadjobs = @q.getdeadjobs @started
282:           deadjobs.each do |job|
283:             @q.jobisdead job
284:             unless job['restartable']
285:               info{ "burried job <#{ job['jid'] }>" }
286:             else
287:               warn{ "dead job <#{ job['jid'] }> will be restarted" }
288:             end
289:           end
290:         end
291:         debug{ "filled morgue" }
292: #--}}}
293:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 474
474:       def finish_job job, status
475: #--{{{
476:         job['finished'] = Util::timestamp(Time::now)
477:         job['elapsed'] = Util::stamptime(job['finished']) - Util::stamptime(job['started'])
478:         t = status.exitstatus rescue nil 
479:         job['exit_status'] = t 
480:         job['state'] = 'finished' 
481:         if t and t == 0
482:           info{ "finished - jid <#{ job['jid'] }> pid <#{ job['pid'] }> exit_status <#{ job['exit_status'] }>" }
483:         else
484:           warn{ "finished - jid <#{ job['jid'] }> pid <#{ job['pid'] }> exit_status <#{ job['exit_status'] }>" }
485:         end
486: #--}}}
487:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 210
210:       def gen_feeder_name path = nil
211: #--{{{
212:         path ||= (@options['name'] || @qpath)
213:         path = Util::realpath(path).gsub(%|/|o, '_')
214:         #File::join(Util::realpath('~'), ".#{ path }.feeder")
215:         basename = ".#{ Util::host }_#{ path }.feeder".gsub(%/_+/,'_')
216:         dirname = Util::realpath '~'
217:         File::join dirname, basename
218: #--}}}
219:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 122
122:       def gen_pidfile name = nil
123: #--{{{
124:         gen_pidfilepath
125: 
126:         begin
127:           FileUtils::mkdir_p(File::dirname(@pidfilepath))
128:         rescue
129:           nil
130:         end
131: 
132:         locked = nil 
133:         no_other_feeder = nil 
134: 
135:         2.times do
136:           locked = false
137:           no_other_feeder = false 
138: 
139:           @pidfile = 
140:             begin
141:               open @pidfilepath, File::CREAT | File::EXCL | File::RDWR
142:             rescue
143:               open @pidfilepath, File::RDWR
144:             end
145: 
146:           ret = @pidfile.posixlock(File::LOCK_EX | File::LOCK_NB)
147:           locked = (ret == 0)
148: 
149:           begin
150:             pid = Integer(IO::read(@pidfilepath)) rescue nil
151: 
152:             unless pid
153:               no_other_feeder = true 
154:               break
155:             end
156: 
157:             if Util::alive?(pid)
158:               no_other_feeder = Process::pid == pid ? true : false
159:                 #no_other_feeder = false 
160:               #else
161:                 #no_other_feeder = false 
162:               #end
163:               break
164:             else
165:               no_other_feeder = true 
166:               STDERR.puts "WARNING : process <#{ pid }> died holding lock on <#{ @pidfilepath }>"
167:               STDERR.puts "WARNING : attempting autorecovery!"
168:               break if locked
169:               STDERR.puts "WARNING : your NFS locking setup is FUBAR - iptables or firewall issues!"
170:               STDERR.puts "WARNING : attempting autorecovery!"
171:               FileUtils::rm_f @pidfilepath 
172:               4.times{ sleep rand }
173:             end
174: 
175:           rescue Exception => e
176:             STDERR.puts "WARNING : #{ e.message } (#{ e.class })"
177:           end
178:         end
179: 
180: 
181:         unless(locked and no_other_feeder)
182:           pid = Integer(IO::read(@pidfilepath)) rescue 'UNKNOWN' 
183:           if @options['quiet']
184:             exit EXIT_FAILURE
185:           else
186:             abort "process <#{ pid }> is already feeding from this queue"
187:           end
188:         else
189:           @pidfile.chmod 0600 rescue nil
190:           @pidfile.rewind
191:           @pidfile.sync = true
192:           @pidfile.print Process::pid
193:           @pidfile.truncate @pidfile.pos
194:           @pidfile.flush
195: 
196:           at_exit do 
197:             FileUtils::rm_f @pidfilepath rescue nil
198:             @pidfile.posixlock File::LOCK_UN rescue nil
199:             @pidfile.close rescue nil
200:           end
201:         end
202: #--}}}
203:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 204
204:       def gen_pidfilepath
205: #--{{{
206:         # @pidfilepath ||= gen_feeder_name
207:         @pidfilepath ||= File::join(@dot_rq_dir, 'pid') 
208: #--}}}
209:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 322
322:       def handle_signal
323: #--{{{
324:         if $rq_sigterm or $rq_sigint
325:           reap_jobs(reap_only = true) until nothing_running? 
326:           info{ "** STOPPING **" }
327:           @jrd.shutdown rescue nil
328:           @pidfile.posixlock File::LOCK_UN
329:           exit EXIT_SUCCESS
330:         end
331: 
332:         if $rq_sighup
333:           reap_jobs(reap_only = true) until nothing_running? 
334:           info{ "** RESTARTING **" }
335:           info{ "** ARGV <#{ @cmd }> **" }
336:           begin
337:             @jrd.shutdown rescue nil
338:             Util::uncache __FILE__ 
339:             @pidfile.posixlock File::LOCK_UN
340:             Util::exec @cmd
341:           rescue Exception => e
342:             fatal{"** FAILED TO RESTART! **"}
343:             fatal{ e }
344:             exit EXIT_FAILURE
345:           end
346:         end
347: #--}}}
348:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 266
266:       def install_redirects
267: #--{{{
268:         if @daemon
269:           open('/dev/null','r+') do |f|
270:             STDIN.reopen f 
271:             STDOUT.reopen f 
272:             STDERR.reopen f 
273:           end
274:         end
275: #--}}}
276:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 220
220:       def install_signal_handlers
221: #--{{{
222:         if @daemon or ENV['RQ_SIGNALS']
223:           $rq_signaled = false
224:           $rq_sighup = false
225:           $rq_sigterm = false
226:           $rq_sigint = false
227:           trap('SIGHUP') do
228:             $rq_signaled = $rq_sighup = 'SIGHUP' 
229:             if nothing_running?
230:               warn{ "signal <SIGHUP>" }
231:               handle_signal
232:             else
233:               warn{ "finishing running jobs before handling signal" }
234:             end
235:           end
236:           trap('SIGTERM') do
237:             $rq_signaled = $rq_sigterm = 'SIGTERM' 
238:             if nothing_running?
239:               warn{ "signal <SIGTERM>" }
240:               handle_signal
241:             else
242:               warn{ "finishing running jobs before handling signal" }
243:             end
244:           end
245:           trap('SIGINT') do
246:             $rq_signaled = $rq_sigint = 'SIGINT' 
247:             if nothing_running?
248:               warn{ "signal <SIGINT>" }
249:               handle_signal
250:             else
251:               warn{ "finishing running jobs before handling signal" }
252:             end
253:           end
254:           @jrd.install_signal_handlers
255:         else
256:           %(SIGHUP SIGTERM SIGINT).each do |sig|
257:             trap(sig) do
258:               warn{ "signal <#{ sig }>" }
259:               warn{ "not cleaning up - only daemon mode cleans up!" }
260:               exit
261:             end
262:           end
263:         end
264: #--}}}
265:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 411
411:       def nothing_running?
412: #--{{{
413:         @children.size == 0
414: #--}}}
415:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 416
416:       def reap_jobs reap_only = false, blocking = true
417: #--{{{
418:         debug{ "reaping jobs..." }
419:         reaped = []
420: 
421:         cid = status = nil
422:     
423:         if blocking
424:           if busy? or reap_only
425:             cid, status = @jrd.waitpid2 -1, Process::WUNTRACED 
426:           else
427:             loop do
428:               debug{ "not busy - busywait loop" }
429:               cid, status = @jrd.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED 
430:               break if cid
431:               start_jobs unless $rq_signaled
432:               break if busy?
433:               cid, status = @jrd.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED 
434:               break if cid
435:               sleep 4.2 
436:             end
437:             cid, status = @jrd.waitpid2 -1, Process::WUNTRACED unless cid
438:           end
439:         else
440:           cid, status = @jrd.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED 
441:         end
442:     
443:         if cid and status
444:           job = @children[cid]
445:           finish_job job, status
446:     
447:           transaction do
448:             loopno = 0
449:             loop do
450:               @q.jobisdone job
451:               @children.delete cid
452:               reaped << cid
453:     
454:               start_jobs unless reap_only or $rq_signaled
455:     
456:               if @children.size == 0 or loopno > 42
457:                 sleep 8 if loopno > 42 # wow - we are CRANKING through jobs so BACK OFF!!
458:                 break
459:               else
460:                 sleep 0.1
461:                 cid, status = @jrd.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED 
462:                 break unless cid and status
463:                 job = @children[cid]
464:                 finish_job job, status
465:               end
466:               loopno += 1
467:             end
468:           end
469:         end
470:         debug{ "<#{ reaped.size }> jobs reaped" }
471:         reaped
472: #--}}}
473:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 294
294:       def reap_zombie_ios 
295: #--{{{
296:         debug{ "reaping zombie ios" }
297:         begin
298:           transaction do
299:             stdin, stdout, stderr = @q.stdin, @q.stdout, @q.stderr
300:             jids = @q.execute("select jid from jobs").map{|tuple| Integer tuple.first}
301:             jids = jids.inject({}){|h,jid| h.update jid => true}
302:             %[ stdin stdout stderr ].each do |d|
303:               Dir::glob(File::join(@q.send(d), "*")).each do |iof|
304:                 begin
305:                   jid = Integer iof[%/\d+\s*$/]
306:                   unless jids[jid]
307:                     debug{ "removing zombie io <#{ iof }>" }
308:                     FileUtils::rm_rf iof
309:                   end
310:                 rescue
311:                   next
312:                 end
313:               end
314:             end
315:           end
316:         rescue Exception => e # because this is a non-essential function
317:           warn{ e }
318:         end
319:         debug{ "reaped" }
320: #--}}}
321:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 509
509:       def relax
510: #--{{{
511:         seconds = rand(@max_sleep - @min_sleep + 1) + @min_sleep
512:         debug{ "relaxing <#{ seconds }>" }
513:         sleep seconds
514: #--}}}
515:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 381
381:       def start_job job
382: #--{{{
383:         jid, command = job['jid'], job['command']
384: 
385:       #
386:       # we setup state slightly prematurely so jobrunner will have it availible
387:       #
388:         job['state'] = 'running'
389:         job['started'] = Util::timestamp Time::now
390:         job['runner'] = Util::hostname 
391: 
392:         job['stdout'] = @q.stdout4 jid
393:         job['stderr'] = @q.stderr4 jid
394: 
395:         jr = @jrd.runner job
396:         cid = jr.cid
397:     
398:         if jr and cid
399:           jr.run
400:           job['pid'] = cid
401:           @children[cid] = job
402:           @q.jobisrunning job
403:           info{ "started - jid <#{ job['jid'] }> pid <#{ job['pid'] }> command <#{ job['command'] }>" }
404:         else
405:           error{ "not started - jid <#{ job['jid'] }> command <#{ job['command'] }>" }
406:         end
407:     
408:         cid
409: #--}}}
410:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 366
366:       def start_jobs
367: #--{{{
368:         debug{ "starting jobs..." }
369:         n_started = 0 
370:         transaction do
371:           until busy?
372:             break unless((job = @q.getjob))
373:             start_job job
374:             n_started += 1 
375:           end
376:         end
377:         debug{ "<#{ n_started }> jobs started" }
378:         n_started
379: #--}}}
380:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 349
349:       def throttle rate = @min_sleep 
350: #--{{{
351:         if Numeric === rate and rate > 0 
352:           if defined? @last_throttle_time and @last_throttle_time
353:             elapsed = Time.now - @last_throttle_time
354:             timeout = rate - elapsed
355:             if timeout > 0
356:               timeout = timeout + rand(rate * 0.10)
357:               debug{ "throttle rate of <#{ rate }> exceeded - sleeping <#{ timeout }>" }
358:               sleep timeout
359:             end
360:           end
361:           @last_throttle_time = Time.now 
362:         end
363:         yield
364: #--}}}
365:       end

[Source]

     # File lib/rq-3.0.0/feeder.rb, line 488
488:       def transaction
489: #--{{{
490:         ret = nil
491:         if @in_transaction
492:           ret = yield
493:         else
494:           begin
495:             @in_transaction = true
496:             @q.transaction{ ret = yield }
497:           ensure
498:             @in_transaction = false 
499:           end
500:         end
501:         ret
502: #--}}}
503:       end

[Validate]