lib/bj/runner.rb in ThiagoLelis-backgroundjob-1.0.4 vs lib/bj/runner.rb in ThiagoLelis-backgroundjob-1.0.5

- old
+ new

@@ -1,357 +1,357 @@ -class Bj - class Runner - class Background - def self.for(*a, &b) new(*a, &b) end - - attribute "command" - attribute "thread" - attribute "pid" - - def initialize command - @command = command - @thread = new_thread - end - - def inspect - { - "command" => command, - "pid" => pid, - }.inspect - end - - -# TODO - auto start runner? - - def new_thread - this = self - Thread.new do - Thread.current.abort_on_exception = true - loop do - cleanup = lambda{} - - IO.popen command, "r+" do |pipe| - this.pid = pid = pipe.pid - cleanup = lambda do - cleanup = lambda{} - begin; Process.kill(Runner.kill_signal, pid); rescue Exception; 42; end - end - at_exit &cleanup - Process.wait - end - - Bj.logger.error{ "#{ command } failed with #{ $?.inspect }" } unless - [0, 42].include?($?.exitstatus) - - cleanup.call - - sleep 42 - end - end - end - end - - module ClassMethods - attribute("thread"){ Thread.current } - attribute("hup_signal"){ Signal.list.keys.index("HUP") ? "HUP" : "ABRT" } - attribute("hup_signaled"){ false } - attribute("kill_signal"){ "TERM" } - attribute("kill_signaled"){ false } - - def tickle - return nil if Bj.config[Runner.no_tickle_key] - ping or start - end - - def ping - begin - pid = nil - uri = nil - process = nil - Bj.transaction do - pid = Bj.config[Runner.key(Process.pid)] || Bj.config[Runner.key] - uri = Bj.config["#{ pid }.uri"] - process = - if uri - require "drb" - # DRb.start_service "druby://localhost:0" - DRbObject.new(nil, uri) - else - Process - end - end - return nil unless pid - pid = Integer pid - begin - process.kill Runner.hup_signal, pid - pid - rescue Exception => e - false - end - rescue Exception => e - false - end - end - - def key ppid = 0 - ppid ||= 0 - "#{ Bj.rails_env }.#{ ppid }.pid" - end - - def no_tickle_key - "#{ Bj.rails_env }.no_tickle" - end - - def start options = {} - options.to_options! - background.delete Bj.rails_env if options[:force] - background[Bj.rails_env] ||= Background.for(command) - end - - def background - @background ||= Hash.new - end - - def background= value - @background ||= value - end - - def command - "#{ Bj.ruby } " + %W[ - #{ Bj.script } - run - --forever - --redirect=#{ log } - --ppid=#{ Process.pid } - --rails_env=#{ Bj.rails_env } - --rails_root=#{ Bj.rails_root } - ].map{|word| word.inspect}.join(" ") - end - - def log - File.join logdir, "bj.#{ Bj.hostname }.#{ Bj.rails_env }.log" - end - - def logdir - File.join File.expand_path(Bj.rails_root), 'log' - end - - def run options = {}, &block - new(options, &block).run - end - end - send :extend, ClassMethods - - module Instance_Methods - attribute "options" - attribute "block" - - def initialize options = {}, &block - options.to_options! - @options, @block = options, block - end - - def run - wait = options[:wait] || 42 - limit = options[:limit] - forever = options[:forever] - - limit = false if forever - wait = Integer wait - loopno = 0 - - Runner.thread = Thread.current - Bj.chroot - - register or exit!(EXIT::WARNING) - - Bj.logger.info{ "STARTED" } - at_exit{ Bj.logger.info{ "STOPPED" } } - - fill_morgue - install_signal_handlers - - loop do - ping_parent - - loopno += 1 - break if(limit and loopno > limit) - - archive_jobs - - catch :no_jobs do - loop do - job = thread = stdout = stderr = nil - - Bj.transaction(options) do - now = Util.now - - job = Bj::Table::Job.find_first_and_lock( - :conditions => ["state = ? and submitted_at <= ?", "pending", now], - :order => "priority DESC, submitted_at ASC", - :limit => 1, - :lock => true) - throw :no_jobs unless job - - - Bj.logger.info{ "#{ job.title } - started" } - - command = job.command - env = job.env || {} - stdin = job.stdin || '' - stdout = job.stdout || '' - stderr = job.stderr || '' - started_at = Util.now - - thread = Util.start command, :cwd=>Bj.rails_root, :env=>env, :stdin=>stdin, :stdout=>stdout, :stderr=>stderr - - job.state = "running" - job.runner = Bj.hostname - job.pid = thread.pid - job.started_at = started_at - job.save! - job.reload - end - - exit_status = thread.value - finished_at = Util.now - - Bj.transaction(options) do - job = Bj::Table::Job.find job.id - break unless job - job.state = "finished" - job.finished_at = finished_at - job.stdout = stdout - job.stderr = stderr - job.exit_status = exit_status - job.save! - job.reload - Bj.logger.info{ "#{ job.title } - exit_status=#{ job.exit_status }" } - end - end - end - - Runner.hup_signaled false - wait.times do - break if Runner.hup_signaled? - break if Runner.kill_signaled? - sleep 1 - end - - break unless(limit or limit == false) - break if Runner.kill_signaled? - end - end - - def ping_parent - ppid = options[:ppid] - return unless ppid - begin - Process.kill 0, Integer(ppid) - rescue Errno::ESRCH - Kernel.exit 42 - rescue Exception - 42 - end - end - - def install_signal_handlers - Runner.hup_signaled false - hup_handler = nil - hup_handler = - trap Runner.hup_signal do |*a| - begin - Runner.hup_signaled true - rescue Exception => e - Bj.logger.error{ e } rescue nil - end - hup_handler.call *a rescue nil - end - - Runner.kill_signaled false - kill_handler = nil - kill_handler = - trap Runner.kill_signal do |*a| - begin - Runner.kill_signaled true - rescue Exception => e - Bj.logger.error{ e } rescue nil - end - kill_handler.call *a rescue nil - end - - begin - trap("INT"){ exit } - rescue Exception - end - end - - def fill_morgue - Bj.transaction do - now = Util.now - jobs = Bj::Table::Job.find :all, - :conditions => ["state = 'running' and runner = ?", Bj.hostname] - jobs.each do |job| - if job.is_restartable? - Bj.logger.info{ "#{ job.title } - found dead and bloated but resubmitted" } - %w[ runner pid started_at finished_at stdout stderr exit_status ].each do |column| - job[column] = nil - end - job.state = 'pending' - else - Bj.logger.info{ "#{ job.title } - found dead and bloated" } - job.state = 'dead' - job.finished_at = now - end - job.save! - end - end - end - - def archive_jobs - Bj.transaction do - now = Util.now - too_old = now - Bj.ttl - jobs = Bj::Table::Job.find :all, - :conditions => ["(state = 'finished' or state = 'dead') and submitted_at < ?", too_old] - jobs.each do |job| - Bj.logger.info{ "#{ job.title } - archived" } - hash = job.to_hash.update(:archived_at => now) - Bj::Table::JobArchive.create! hash - job.destroy - end - end - end - - def register - Bj.transaction do - pid = Bj.config[key] - return false if Util.alive?(pid) - Bj.config[key] = Process.pid - unless Bj.util.ipc_signals_supported? # not winblows - require "drb" - DRb.start_service "druby://localhost:0", Process - Bj.config["#{ Process.pid }.uri"] = DRb.uri - end - end - at_exit{ unregister } - true - rescue Exception - false - end - - def unregister - Bj.transaction do - Bj.config.delete key - end - true - rescue Exception - false - end - - def key - @key ||= ( options[:ppid] ? Runner.key(options[:ppid]) : Runner.key ) - end - end - send :include, Instance_Methods - end -end +class Bj + class Runner + class Background + def self.for(*a, &b) new(*a, &b) end + + attribute "command" + attribute "thread" + attribute "pid" + + def initialize command + @command = command + @thread = new_thread + end + + def inspect + { + "command" => command, + "pid" => pid, + }.inspect + end + + +# TODO - auto start runner? + + def new_thread + this = self + Thread.new do + Thread.current.abort_on_exception = true + loop do + cleanup = lambda{} + + IO.popen command, "r+" do |pipe| + this.pid = pid = pipe.pid + cleanup = lambda do + cleanup = lambda{} + begin; Process.kill(Runner.kill_signal, pid); rescue Exception; 42; end + end + at_exit &cleanup + Process.wait + end + + Bj.logger.error{ "#{ command } failed with #{ $?.inspect }" } unless + [0, 42].include?($?.exitstatus) + + cleanup.call + + sleep 42 + end + end + end + end + + module ClassMethods + attribute("thread"){ Thread.current } + attribute("hup_signal"){ Signal.list.keys.index("HUP") ? "HUP" : "ABRT" } + attribute("hup_signaled"){ false } + attribute("kill_signal"){ "TERM" } + attribute("kill_signaled"){ false } + + def tickle + return nil if Bj.config[Runner.no_tickle_key] + ping or start + end + + def ping + begin + pid = nil + uri = nil + process = nil + Bj.transaction do + pid = Bj.config[Runner.key(Process.pid)] || Bj.config[Runner.key] + uri = Bj.config["#{ pid }.uri"] + process = + if uri + require "drb" + # DRb.start_service "druby://localhost:0" + DRbObject.new(nil, uri) + else + Process + end + end + return nil unless pid + pid = Integer pid + begin + process.kill Runner.hup_signal, pid + pid + rescue Exception => e + false + end + rescue Exception => e + false + end + end + + def key ppid = 0 + ppid ||= 0 + "#{ Bj.rails_env }.#{ ppid }.pid" + end + + def no_tickle_key + "#{ Bj.rails_env }.no_tickle" + end + + def start options = {} + options.to_options! + background.delete Bj.rails_env if options[:force] + background[Bj.rails_env] ||= Background.for(command) + end + + def background + @background ||= Hash.new + end + + def background= value + @background ||= value + end + + def command + "#{ Bj.ruby } " + %W[ + #{ Bj.script } + run + --forever + --redirect=#{ log } + --ppid=#{ Process.pid } + --rails_env=#{ Bj.rails_env } + --rails_root=#{ Bj.rails_root } + ].map{|word| word.inspect}.join(" ") + end + + def log + File.join logdir, "bj.#{ Bj.hostname }.#{ Bj.rails_env }.log" + end + + def logdir + File.join File.expand_path(Bj.rails_root), 'log' + end + + def run options = {}, &block + new(options, &block).run + end + end + send :extend, ClassMethods + + module Instance_Methods + attribute "options" + attribute "block" + + def initialize options = {}, &block + options.to_options! + @options, @block = options, block + end + + def run + wait = options[:wait] || 42 + limit = options[:limit] + forever = options[:forever] + + limit = false if forever + wait = Integer wait + loopno = 0 + + Runner.thread = Thread.current + Bj.chroot + + register or exit!(EXIT::WARNING) + + Bj.logger.info{ "STARTED" } + at_exit{ Bj.logger.info{ "STOPPED" } } + + fill_morgue + install_signal_handlers + + loop do + ping_parent + + loopno += 1 + break if(limit and loopno > limit) + + archive_jobs + + catch :no_jobs do + loop do + job = thread = stdout = stderr = nil + + Bj.transaction(options) do + now = Util.now + + job = Bj::Table::Job.find_first_and_lock( + :conditions => ["state = ? and submitted_at <= ?", "pending", now], + :order => "priority DESC, submitted_at ASC", + :limit => 1, + :lock => true) + throw :no_jobs unless job + + + Bj.logger.info{ "#{ job.title } - started" } + + command = job.command + env = job.env || {} + stdin = job.stdin || '' + stdout = job.stdout || '' + stderr = job.stderr || '' + started_at = Util.now + + thread = Util.start command, :cwd=>Bj.rails_root, :env=>env, :stdin=>stdin, :stdout=>stdout, :stderr=>stderr + + job.state = "running" + job.runner = Bj.hostname + job.pid = thread.pid + job.started_at = started_at + job.save! + job.reload + end + + exit_status = thread.value + finished_at = Util.now + + Bj.transaction(options) do + job = Bj::Table::Job.find job.id + break unless job + job.state = "finished" + job.finished_at = finished_at + job.stdout = stdout + job.stderr = stderr + job.exit_status = exit_status + job.save! + job.reload + Bj.logger.info{ "#{ job.title } - exit_status=#{ job.exit_status }" } + end + end + end + + Runner.hup_signaled false + wait.times do + break if Runner.hup_signaled? + break if Runner.kill_signaled? + sleep 1 + end + + break unless(limit or limit == false) + break if Runner.kill_signaled? + end + end + + def ping_parent + ppid = options[:ppid] + return unless ppid + begin + Process.kill 0, Integer(ppid) + rescue Errno::ESRCH + Kernel.exit 42 + rescue Exception + 42 + end + end + + def install_signal_handlers + Runner.hup_signaled false + hup_handler = nil + hup_handler = + trap Runner.hup_signal do |*a| + begin + Runner.hup_signaled true + rescue Exception => e + Bj.logger.error{ e } rescue nil + end + hup_handler.call *a rescue nil + end + + Runner.kill_signaled false + kill_handler = nil + kill_handler = + trap Runner.kill_signal do |*a| + begin + Runner.kill_signaled true + rescue Exception => e + Bj.logger.error{ e } rescue nil + end + kill_handler.call *a rescue nil + end + + begin + trap("INT"){ exit } + rescue Exception + end + end + + def fill_morgue + Bj.transaction do + now = Util.now + jobs = Bj::Table::Job.find :all, + :conditions => ["state = 'running' and runner = ?", Bj.hostname] + jobs.each do |job| + if job.is_restartable? + Bj.logger.info{ "#{ job.title } - found dead and bloated but resubmitted" } + %w[ runner pid started_at finished_at stdout stderr exit_status ].each do |column| + job[column] = nil + end + job.state = 'pending' + else + Bj.logger.info{ "#{ job.title } - found dead and bloated" } + job.state = 'dead' + job.finished_at = now + end + job.save! + end + end + end + + def archive_jobs + Bj.transaction do + now = Util.now + too_old = now - Bj.ttl + jobs = Bj::Table::Job.find :all, + :conditions => ["(state = 'finished' or state = 'dead') and submitted_at < ?", too_old] + jobs.each do |job| + Bj.logger.info{ "#{ job.title } - archived" } + hash = job.to_hash.update(:archived_at => now) + Bj::Table::JobArchive.create! hash + job.destroy + end + end + end + + def register + Bj.transaction do + pid = Bj.config[key] + return false if Util.alive?(pid) + Bj.config[key] = Process.pid + unless Bj.util.ipc_signals_supported? # not winblows + require "drb" + DRb.start_service "druby://localhost:0", Process + Bj.config["#{ Process.pid }.uri"] = DRb.uri + end + end + at_exit{ unregister } + true + rescue Exception + false + end + + def unregister + Bj.transaction do + Bj.config.delete key + end + true + rescue Exception + false + end + + def key + @key ||= ( options[:ppid] ? Runner.key(options[:ppid]) : Runner.key ) + end + end + send :include, Instance_Methods + end +end