In: |
lib/rq-3.0.0/feeder.rb
|
Parent: | MainHelper |
the Feeder class is responsible for running jobs from a queue - or ‘feeding’ from that queue. the mode of operation is essentially to run jobs as quickly as possible, return them to the queue, and then to run more jobs if any exist. if no jobs exist the Feeder will periodically poll the queue to see if any new jobs have arrived.
DEFAULT_MIN_SLEEP | = | 42 |
DEFAULT_MAX_SLEEP | = | 240 |
DEFAULT_FEED | = | 2 |
feed | [RW] | |
max_sleep | [RW] | |
min_sleep | [RW] |
# File lib/rq-3.0.0/feeder.rb, line 504 504: def busy? 505: #--{{{ 506: @children.size >= @max_feed 507: #--}}} 508: end
# File lib/rq-3.0.0/feeder.rb, line 98 98: def daemon 99: #--{{{ 100: if @options['daemon'] 101: fork do 102: Process::setsid 103: pid = 104: fork do 105: Dir::chdir(Util.realpath('~')) 106: File::umask 0 107: @daemon = true 108: yield 109: exit EXIT_SUCCESS 110: end 111: # STDOUT.puts "#{ pid }" 112: exit! 113: end 114: exit! 115: else 116: @daemon = false 117: yield 118: exit EXIT_SUCCESS 119: end 120: #--}}} 121: end
# File lib/rq-3.0.0/feeder.rb, line 41 41: def feed 42: #--{{{ 43: daemon do 44: gen_pidfile 45: @main.init_logging 46: @logger = @main.logger 47: set_q 48: 49: @pid = Process::pid 50: @cmd = @main.cmd 51: @started = Util::timestamp 52: @min_sleep = Integer(@options['min_sleep'] || defval('min_sleep')) 53: @max_sleep = Integer(@options['max_sleep'] || defval('max_sleep')) 54: @max_feed = Integer(@options['max_feed'] || defval('feed')) 55: @children = Hash::new 56: @jrd = JobRunnerDaemon::daemon @q 57: 58: install_signal_handlers 59: 60: if @daemon and not @quiet 61: STDOUT.puts "pid <#{ Process::pid }> started" 62: STDOUT.flush 63: end 64: 65: install_redirects 66: 67: info{ "** STARTED **" } 68: info{ "version <#{ RQ::VERSION }>" } 69: info{ "cmd <#{ @cmd }>" } 70: info{ "pid <#{ @pid }>" } 71: info{ "pidfile <#{ @pidfile.path }>" } 72: info{ "jobrunnerdaemon uri <#{ @jrd.uri }> pid <#{ @jrd.pid }>" } 73: info{ "qpath <#{ @qpath }>" } 74: debug{ "mode <#{ @mode }>" } 75: debug{ "max_feed <#{ @max_feed }>" } 76: debug{ "min_sleep <#{ @min_sleep }>" } 77: debug{ "max_sleep <#{ @max_sleep }>" } 78: 79: transaction do 80: fill_morgue 81: reap_zombie_ios 82: end 83: 84: loop do 85: handle_signal if $rq_signaled 86: throttle(@min_sleep) do 87: start_jobs unless busy? 88: if nothing_running? 89: relax 90: else 91: reap_jobs 92: end 93: end 94: end 95: end 96: #--}}} 97: end
# File lib/rq-3.0.0/feeder.rb, line 277 277: def fill_morgue 278: #--{{{ 279: debug{ "filling morgue..." } 280: transaction do 281: deadjobs = @q.getdeadjobs @started 282: deadjobs.each do |job| 283: @q.jobisdead job 284: unless job['restartable'] 285: info{ "burried job <#{ job['jid'] }>" } 286: else 287: warn{ "dead job <#{ job['jid'] }> will be restarted" } 288: end 289: end 290: end 291: debug{ "filled morgue" } 292: #--}}} 293: end
# File lib/rq-3.0.0/feeder.rb, line 474 474: def finish_job job, status 475: #--{{{ 476: job['finished'] = Util::timestamp(Time::now) 477: job['elapsed'] = Util::stamptime(job['finished']) - Util::stamptime(job['started']) 478: t = status.exitstatus rescue nil 479: job['exit_status'] = t 480: job['state'] = 'finished' 481: if t and t == 0 482: info{ "finished - jid <#{ job['jid'] }> pid <#{ job['pid'] }> exit_status <#{ job['exit_status'] }>" } 483: else 484: warn{ "finished - jid <#{ job['jid'] }> pid <#{ job['pid'] }> exit_status <#{ job['exit_status'] }>" } 485: end 486: #--}}} 487: end
# File lib/rq-3.0.0/feeder.rb, line 210 210: def gen_feeder_name path = nil 211: #--{{{ 212: path ||= (@options['name'] || @qpath) 213: path = Util::realpath(path).gsub(%|/|o, '_') 214: #File::join(Util::realpath('~'), ".#{ path }.feeder") 215: basename = ".#{ Util::host }_#{ path }.feeder".gsub(%/_+/,'_') 216: dirname = Util::realpath '~' 217: File::join dirname, basename 218: #--}}} 219: end
# File lib/rq-3.0.0/feeder.rb, line 122 122: def gen_pidfile name = nil 123: #--{{{ 124: gen_pidfilepath 125: 126: begin 127: FileUtils::mkdir_p(File::dirname(@pidfilepath)) 128: rescue 129: nil 130: end 131: 132: locked = nil 133: no_other_feeder = nil 134: 135: 2.times do 136: locked = false 137: no_other_feeder = false 138: 139: @pidfile = 140: begin 141: open @pidfilepath, File::CREAT | File::EXCL | File::RDWR 142: rescue 143: open @pidfilepath, File::RDWR 144: end 145: 146: ret = @pidfile.posixlock(File::LOCK_EX | File::LOCK_NB) 147: locked = (ret == 0) 148: 149: begin 150: pid = Integer(IO::read(@pidfilepath)) rescue nil 151: 152: unless pid 153: no_other_feeder = true 154: break 155: end 156: 157: if Util::alive?(pid) 158: no_other_feeder = Process::pid == pid ? true : false 159: #no_other_feeder = false 160: #else 161: #no_other_feeder = false 162: #end 163: break 164: else 165: no_other_feeder = true 166: STDERR.puts "WARNING : process <#{ pid }> died holding lock on <#{ @pidfilepath }>" 167: STDERR.puts "WARNING : attempting autorecovery!" 168: break if locked 169: STDERR.puts "WARNING : your NFS locking setup is FUBAR - iptables or firewall issues!" 170: STDERR.puts "WARNING : attempting autorecovery!" 171: FileUtils::rm_f @pidfilepath 172: 4.times{ sleep rand } 173: end 174: 175: rescue Exception => e 176: STDERR.puts "WARNING : #{ e.message } (#{ e.class })" 177: end 178: end 179: 180: 181: unless(locked and no_other_feeder) 182: pid = Integer(IO::read(@pidfilepath)) rescue 'UNKNOWN' 183: if @options['quiet'] 184: exit EXIT_FAILURE 185: else 186: abort "process <#{ pid }> is already feeding from this queue" 187: end 188: else 189: @pidfile.chmod 0600 rescue nil 190: @pidfile.rewind 191: @pidfile.sync = true 192: @pidfile.print Process::pid 193: @pidfile.truncate @pidfile.pos 194: @pidfile.flush 195: 196: at_exit do 197: FileUtils::rm_f @pidfilepath rescue nil 198: @pidfile.posixlock File::LOCK_UN rescue nil 199: @pidfile.close rescue nil 200: end 201: end 202: #--}}} 203: end
# File lib/rq-3.0.0/feeder.rb, line 204 204: def gen_pidfilepath 205: #--{{{ 206: # @pidfilepath ||= gen_feeder_name 207: @pidfilepath ||= File::join(@dot_rq_dir, 'pid') 208: #--}}} 209: end
# File lib/rq-3.0.0/feeder.rb, line 322 322: def handle_signal 323: #--{{{ 324: if $rq_sigterm or $rq_sigint 325: reap_jobs(reap_only = true) until nothing_running? 326: info{ "** STOPPING **" } 327: @jrd.shutdown rescue nil 328: @pidfile.posixlock File::LOCK_UN 329: exit EXIT_SUCCESS 330: end 331: 332: if $rq_sighup 333: reap_jobs(reap_only = true) until nothing_running? 334: info{ "** RESTARTING **" } 335: info{ "** ARGV <#{ @cmd }> **" } 336: begin 337: @jrd.shutdown rescue nil 338: Util::uncache __FILE__ 339: @pidfile.posixlock File::LOCK_UN 340: Util::exec @cmd 341: rescue Exception => e 342: fatal{"** FAILED TO RESTART! **"} 343: fatal{ e } 344: exit EXIT_FAILURE 345: end 346: end 347: #--}}} 348: end
# File lib/rq-3.0.0/feeder.rb, line 266 266: def install_redirects 267: #--{{{ 268: if @daemon 269: open('/dev/null','r+') do |f| 270: STDIN.reopen f 271: STDOUT.reopen f 272: STDERR.reopen f 273: end 274: end 275: #--}}} 276: end
# File lib/rq-3.0.0/feeder.rb, line 220 220: def install_signal_handlers 221: #--{{{ 222: if @daemon or ENV['RQ_SIGNALS'] 223: $rq_signaled = false 224: $rq_sighup = false 225: $rq_sigterm = false 226: $rq_sigint = false 227: trap('SIGHUP') do 228: $rq_signaled = $rq_sighup = 'SIGHUP' 229: if nothing_running? 230: warn{ "signal <SIGHUP>" } 231: handle_signal 232: else 233: warn{ "finishing running jobs before handling signal" } 234: end 235: end 236: trap('SIGTERM') do 237: $rq_signaled = $rq_sigterm = 'SIGTERM' 238: if nothing_running? 239: warn{ "signal <SIGTERM>" } 240: handle_signal 241: else 242: warn{ "finishing running jobs before handling signal" } 243: end 244: end 245: trap('SIGINT') do 246: $rq_signaled = $rq_sigint = 'SIGINT' 247: if nothing_running? 248: warn{ "signal <SIGINT>" } 249: handle_signal 250: else 251: warn{ "finishing running jobs before handling signal" } 252: end 253: end 254: @jrd.install_signal_handlers 255: else 256: %(SIGHUP SIGTERM SIGINT).each do |sig| 257: trap(sig) do 258: warn{ "signal <#{ sig }>" } 259: warn{ "not cleaning up - only daemon mode cleans up!" } 260: exit 261: end 262: end 263: end 264: #--}}} 265: end
# File lib/rq-3.0.0/feeder.rb, line 411 411: def nothing_running? 412: #--{{{ 413: @children.size == 0 414: #--}}} 415: end
# File lib/rq-3.0.0/feeder.rb, line 416 416: def reap_jobs reap_only = false, blocking = true 417: #--{{{ 418: debug{ "reaping jobs..." } 419: reaped = [] 420: 421: cid = status = nil 422: 423: if blocking 424: if busy? or reap_only 425: cid, status = @jrd.waitpid2 -1, Process::WUNTRACED 426: else 427: loop do 428: debug{ "not busy - busywait loop" } 429: cid, status = @jrd.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED 430: break if cid 431: start_jobs unless $rq_signaled 432: break if busy? 433: cid, status = @jrd.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED 434: break if cid 435: sleep 4.2 436: end 437: cid, status = @jrd.waitpid2 -1, Process::WUNTRACED unless cid 438: end 439: else 440: cid, status = @jrd.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED 441: end 442: 443: if cid and status 444: job = @children[cid] 445: finish_job job, status 446: 447: transaction do 448: loopno = 0 449: loop do 450: @q.jobisdone job 451: @children.delete cid 452: reaped << cid 453: 454: start_jobs unless reap_only or $rq_signaled 455: 456: if @children.size == 0 or loopno > 42 457: sleep 8 if loopno > 42 # wow - we are CRANKING through jobs so BACK OFF!! 458: break 459: else 460: sleep 0.1 461: cid, status = @jrd.waitpid2 -1, Process::WNOHANG | Process::WUNTRACED 462: break unless cid and status 463: job = @children[cid] 464: finish_job job, status 465: end 466: loopno += 1 467: end 468: end 469: end 470: debug{ "<#{ reaped.size }> jobs reaped" } 471: reaped 472: #--}}} 473: end
# File lib/rq-3.0.0/feeder.rb, line 294 294: def reap_zombie_ios 295: #--{{{ 296: debug{ "reaping zombie ios" } 297: begin 298: transaction do 299: stdin, stdout, stderr = @q.stdin, @q.stdout, @q.stderr 300: jids = @q.execute("select jid from jobs").map{|tuple| Integer tuple.first} 301: jids = jids.inject({}){|h,jid| h.update jid => true} 302: %[ stdin stdout stderr ].each do |d| 303: Dir::glob(File::join(@q.send(d), "*")).each do |iof| 304: begin 305: jid = Integer iof[%/\d+\s*$/] 306: unless jids[jid] 307: debug{ "removing zombie io <#{ iof }>" } 308: FileUtils::rm_rf iof 309: end 310: rescue 311: next 312: end 313: end 314: end 315: end 316: rescue Exception => e # because this is a non-essential function 317: warn{ e } 318: end 319: debug{ "reaped" } 320: #--}}} 321: end
# File lib/rq-3.0.0/feeder.rb, line 509 509: def relax 510: #--{{{ 511: seconds = rand(@max_sleep - @min_sleep + 1) + @min_sleep 512: debug{ "relaxing <#{ seconds }>" } 513: sleep seconds 514: #--}}} 515: end
# File lib/rq-3.0.0/feeder.rb, line 381 381: def start_job job 382: #--{{{ 383: jid, command = job['jid'], job['command'] 384: 385: # 386: # we setup state slightly prematurely so jobrunner will have it availible 387: # 388: job['state'] = 'running' 389: job['started'] = Util::timestamp Time::now 390: job['runner'] = Util::hostname 391: 392: job['stdout'] = @q.stdout4 jid 393: job['stderr'] = @q.stderr4 jid 394: 395: jr = @jrd.runner job 396: cid = jr.cid 397: 398: if jr and cid 399: jr.run 400: job['pid'] = cid 401: @children[cid] = job 402: @q.jobisrunning job 403: info{ "started - jid <#{ job['jid'] }> pid <#{ job['pid'] }> command <#{ job['command'] }>" } 404: else 405: error{ "not started - jid <#{ job['jid'] }> command <#{ job['command'] }>" } 406: end 407: 408: cid 409: #--}}} 410: end
# File lib/rq-3.0.0/feeder.rb, line 366 366: def start_jobs 367: #--{{{ 368: debug{ "starting jobs..." } 369: n_started = 0 370: transaction do 371: until busy? 372: break unless((job = @q.getjob)) 373: start_job job 374: n_started += 1 375: end 376: end 377: debug{ "<#{ n_started }> jobs started" } 378: n_started 379: #--}}} 380: end
# File lib/rq-3.0.0/feeder.rb, line 349 349: def throttle rate = @min_sleep 350: #--{{{ 351: if Numeric === rate and rate > 0 352: if defined? @last_throttle_time and @last_throttle_time 353: elapsed = Time.now - @last_throttle_time 354: timeout = rate - elapsed 355: if timeout > 0 356: timeout = timeout + rand(rate * 0.10) 357: debug{ "throttle rate of <#{ rate }> exceeded - sleeping <#{ timeout }>" } 358: sleep timeout 359: end 360: end 361: @last_throttle_time = Time.now 362: end 363: yield 364: #--}}} 365: end
# File lib/rq-3.0.0/feeder.rb, line 488 488: def transaction 489: #--{{{ 490: ret = nil 491: if @in_transaction 492: ret = yield 493: else 494: begin 495: @in_transaction = true 496: @q.transaction{ ret = yield } 497: ensure 498: @in_transaction = false 499: end 500: end 501: ret 502: #--}}} 503: end