lib/bj/runner.rb in ThiagoLelis-backgroundjob-1.0.4 vs lib/bj/runner.rb in ThiagoLelis-backgroundjob-1.0.5
- old
+ new
@@ -1,357 +1,357 @@
-class Bj
- class Runner
- class Background
- def self.for(*a, &b) new(*a, &b) end
-
- attribute "command"
- attribute "thread"
- attribute "pid"
-
- def initialize command
- @command = command
- @thread = new_thread
- end
-
- def inspect
- {
- "command" => command,
- "pid" => pid,
- }.inspect
- end
-
-
-# TODO - auto start runner?
-
- def new_thread
- this = self
- Thread.new do
- Thread.current.abort_on_exception = true
- loop do
- cleanup = lambda{}
-
- IO.popen command, "r+" do |pipe|
- this.pid = pid = pipe.pid
- cleanup = lambda do
- cleanup = lambda{}
- begin; Process.kill(Runner.kill_signal, pid); rescue Exception; 42; end
- end
- at_exit &cleanup
- Process.wait
- end
-
- Bj.logger.error{ "#{ command } failed with #{ $?.inspect }" } unless
- [0, 42].include?($?.exitstatus)
-
- cleanup.call
-
- sleep 42
- end
- end
- end
- end
-
- module ClassMethods
- attribute("thread"){ Thread.current }
- attribute("hup_signal"){ Signal.list.keys.index("HUP") ? "HUP" : "ABRT" }
- attribute("hup_signaled"){ false }
- attribute("kill_signal"){ "TERM" }
- attribute("kill_signaled"){ false }
-
- def tickle
- return nil if Bj.config[Runner.no_tickle_key]
- ping or start
- end
-
- def ping
- begin
- pid = nil
- uri = nil
- process = nil
- Bj.transaction do
- pid = Bj.config[Runner.key(Process.pid)] || Bj.config[Runner.key]
- uri = Bj.config["#{ pid }.uri"]
- process =
- if uri
- require "drb"
- # DRb.start_service "druby://localhost:0"
- DRbObject.new(nil, uri)
- else
- Process
- end
- end
- return nil unless pid
- pid = Integer pid
- begin
- process.kill Runner.hup_signal, pid
- pid
- rescue Exception => e
- false
- end
- rescue Exception => e
- false
- end
- end
-
- def key ppid = 0
- ppid ||= 0
- "#{ Bj.rails_env }.#{ ppid }.pid"
- end
-
- def no_tickle_key
- "#{ Bj.rails_env }.no_tickle"
- end
-
- def start options = {}
- options.to_options!
- background.delete Bj.rails_env if options[:force]
- background[Bj.rails_env] ||= Background.for(command)
- end
-
- def background
- @background ||= Hash.new
- end
-
- def background= value
- @background ||= value
- end
-
- def command
- "#{ Bj.ruby } " + %W[
- #{ Bj.script }
- run
- --forever
- --redirect=#{ log }
- --ppid=#{ Process.pid }
- --rails_env=#{ Bj.rails_env }
- --rails_root=#{ Bj.rails_root }
- ].map{|word| word.inspect}.join(" ")
- end
-
- def log
- File.join logdir, "bj.#{ Bj.hostname }.#{ Bj.rails_env }.log"
- end
-
- def logdir
- File.join File.expand_path(Bj.rails_root), 'log'
- end
-
- def run options = {}, &block
- new(options, &block).run
- end
- end
- send :extend, ClassMethods
-
- module Instance_Methods
- attribute "options"
- attribute "block"
-
- def initialize options = {}, &block
- options.to_options!
- @options, @block = options, block
- end
-
- def run
- wait = options[:wait] || 42
- limit = options[:limit]
- forever = options[:forever]
-
- limit = false if forever
- wait = Integer wait
- loopno = 0
-
- Runner.thread = Thread.current
- Bj.chroot
-
- register or exit!(EXIT::WARNING)
-
- Bj.logger.info{ "STARTED" }
- at_exit{ Bj.logger.info{ "STOPPED" } }
-
- fill_morgue
- install_signal_handlers
-
- loop do
- ping_parent
-
- loopno += 1
- break if(limit and loopno > limit)
-
- archive_jobs
-
- catch :no_jobs do
- loop do
- job = thread = stdout = stderr = nil
-
- Bj.transaction(options) do
- now = Util.now
-
- job = Bj::Table::Job.find_first_and_lock(
- :conditions => ["state = ? and submitted_at <= ?", "pending", now],
- :order => "priority DESC, submitted_at ASC",
- :limit => 1,
- :lock => true)
- throw :no_jobs unless job
-
-
- Bj.logger.info{ "#{ job.title } - started" }
-
- command = job.command
- env = job.env || {}
- stdin = job.stdin || ''
- stdout = job.stdout || ''
- stderr = job.stderr || ''
- started_at = Util.now
-
- thread = Util.start command, :cwd=>Bj.rails_root, :env=>env, :stdin=>stdin, :stdout=>stdout, :stderr=>stderr
-
- job.state = "running"
- job.runner = Bj.hostname
- job.pid = thread.pid
- job.started_at = started_at
- job.save!
- job.reload
- end
-
- exit_status = thread.value
- finished_at = Util.now
-
- Bj.transaction(options) do
- job = Bj::Table::Job.find job.id
- break unless job
- job.state = "finished"
- job.finished_at = finished_at
- job.stdout = stdout
- job.stderr = stderr
- job.exit_status = exit_status
- job.save!
- job.reload
- Bj.logger.info{ "#{ job.title } - exit_status=#{ job.exit_status }" }
- end
- end
- end
-
- Runner.hup_signaled false
- wait.times do
- break if Runner.hup_signaled?
- break if Runner.kill_signaled?
- sleep 1
- end
-
- break unless(limit or limit == false)
- break if Runner.kill_signaled?
- end
- end
-
- def ping_parent
- ppid = options[:ppid]
- return unless ppid
- begin
- Process.kill 0, Integer(ppid)
- rescue Errno::ESRCH
- Kernel.exit 42
- rescue Exception
- 42
- end
- end
-
- def install_signal_handlers
- Runner.hup_signaled false
- hup_handler = nil
- hup_handler =
- trap Runner.hup_signal do |*a|
- begin
- Runner.hup_signaled true
- rescue Exception => e
- Bj.logger.error{ e } rescue nil
- end
- hup_handler.call *a rescue nil
- end
-
- Runner.kill_signaled false
- kill_handler = nil
- kill_handler =
- trap Runner.kill_signal do |*a|
- begin
- Runner.kill_signaled true
- rescue Exception => e
- Bj.logger.error{ e } rescue nil
- end
- kill_handler.call *a rescue nil
- end
-
- begin
- trap("INT"){ exit }
- rescue Exception
- end
- end
-
- def fill_morgue
- Bj.transaction do
- now = Util.now
- jobs = Bj::Table::Job.find :all,
- :conditions => ["state = 'running' and runner = ?", Bj.hostname]
- jobs.each do |job|
- if job.is_restartable?
- Bj.logger.info{ "#{ job.title } - found dead and bloated but resubmitted" }
- %w[ runner pid started_at finished_at stdout stderr exit_status ].each do |column|
- job[column] = nil
- end
- job.state = 'pending'
- else
- Bj.logger.info{ "#{ job.title } - found dead and bloated" }
- job.state = 'dead'
- job.finished_at = now
- end
- job.save!
- end
- end
- end
-
- def archive_jobs
- Bj.transaction do
- now = Util.now
- too_old = now - Bj.ttl
- jobs = Bj::Table::Job.find :all,
- :conditions => ["(state = 'finished' or state = 'dead') and submitted_at < ?", too_old]
- jobs.each do |job|
- Bj.logger.info{ "#{ job.title } - archived" }
- hash = job.to_hash.update(:archived_at => now)
- Bj::Table::JobArchive.create! hash
- job.destroy
- end
- end
- end
-
- def register
- Bj.transaction do
- pid = Bj.config[key]
- return false if Util.alive?(pid)
- Bj.config[key] = Process.pid
- unless Bj.util.ipc_signals_supported? # not winblows
- require "drb"
- DRb.start_service "druby://localhost:0", Process
- Bj.config["#{ Process.pid }.uri"] = DRb.uri
- end
- end
- at_exit{ unregister }
- true
- rescue Exception
- false
- end
-
- def unregister
- Bj.transaction do
- Bj.config.delete key
- end
- true
- rescue Exception
- false
- end
-
- def key
- @key ||= ( options[:ppid] ? Runner.key(options[:ppid]) : Runner.key )
- end
- end
- send :include, Instance_Methods
- end
-end
+class Bj
+ class Runner
+ class Background
+ def self.for(*a, &b) new(*a, &b) end
+
+ attribute "command"
+ attribute "thread"
+ attribute "pid"
+
+ def initialize command
+ @command = command
+ @thread = new_thread
+ end
+
+ def inspect
+ {
+ "command" => command,
+ "pid" => pid,
+ }.inspect
+ end
+
+
+# TODO - auto start runner?
+
+ def new_thread
+ this = self
+ Thread.new do
+ Thread.current.abort_on_exception = true
+ loop do
+ cleanup = lambda{}
+
+ IO.popen command, "r+" do |pipe|
+ this.pid = pid = pipe.pid
+ cleanup = lambda do
+ cleanup = lambda{}
+ begin; Process.kill(Runner.kill_signal, pid); rescue Exception; 42; end
+ end
+ at_exit &cleanup
+ Process.wait
+ end
+
+ Bj.logger.error{ "#{ command } failed with #{ $?.inspect }" } unless
+ [0, 42].include?($?.exitstatus)
+
+ cleanup.call
+
+ sleep 42
+ end
+ end
+ end
+ end
+
+ module ClassMethods
+ attribute("thread"){ Thread.current }
+ attribute("hup_signal"){ Signal.list.keys.index("HUP") ? "HUP" : "ABRT" }
+ attribute("hup_signaled"){ false }
+ attribute("kill_signal"){ "TERM" }
+ attribute("kill_signaled"){ false }
+
+ def tickle
+ return nil if Bj.config[Runner.no_tickle_key]
+ ping or start
+ end
+
+ def ping
+ begin
+ pid = nil
+ uri = nil
+ process = nil
+ Bj.transaction do
+ pid = Bj.config[Runner.key(Process.pid)] || Bj.config[Runner.key]
+ uri = Bj.config["#{ pid }.uri"]
+ process =
+ if uri
+ require "drb"
+ # DRb.start_service "druby://localhost:0"
+ DRbObject.new(nil, uri)
+ else
+ Process
+ end
+ end
+ return nil unless pid
+ pid = Integer pid
+ begin
+ process.kill Runner.hup_signal, pid
+ pid
+ rescue Exception => e
+ false
+ end
+ rescue Exception => e
+ false
+ end
+ end
+
+ def key ppid = 0
+ ppid ||= 0
+ "#{ Bj.rails_env }.#{ ppid }.pid"
+ end
+
+ def no_tickle_key
+ "#{ Bj.rails_env }.no_tickle"
+ end
+
+ def start options = {}
+ options.to_options!
+ background.delete Bj.rails_env if options[:force]
+ background[Bj.rails_env] ||= Background.for(command)
+ end
+
+ def background
+ @background ||= Hash.new
+ end
+
+ def background= value
+ @background ||= value
+ end
+
+ def command
+ "#{ Bj.ruby } " + %W[
+ #{ Bj.script }
+ run
+ --forever
+ --redirect=#{ log }
+ --ppid=#{ Process.pid }
+ --rails_env=#{ Bj.rails_env }
+ --rails_root=#{ Bj.rails_root }
+ ].map{|word| word.inspect}.join(" ")
+ end
+
+ def log
+ File.join logdir, "bj.#{ Bj.hostname }.#{ Bj.rails_env }.log"
+ end
+
+ def logdir
+ File.join File.expand_path(Bj.rails_root), 'log'
+ end
+
+ def run options = {}, &block
+ new(options, &block).run
+ end
+ end
+ send :extend, ClassMethods
+
+ module Instance_Methods
+ attribute "options"
+ attribute "block"
+
+ def initialize options = {}, &block
+ options.to_options!
+ @options, @block = options, block
+ end
+
+ def run
+ wait = options[:wait] || 42
+ limit = options[:limit]
+ forever = options[:forever]
+
+ limit = false if forever
+ wait = Integer wait
+ loopno = 0
+
+ Runner.thread = Thread.current
+ Bj.chroot
+
+ register or exit!(EXIT::WARNING)
+
+ Bj.logger.info{ "STARTED" }
+ at_exit{ Bj.logger.info{ "STOPPED" } }
+
+ fill_morgue
+ install_signal_handlers
+
+ loop do
+ ping_parent
+
+ loopno += 1
+ break if(limit and loopno > limit)
+
+ archive_jobs
+
+ catch :no_jobs do
+ loop do
+ job = thread = stdout = stderr = nil
+
+ Bj.transaction(options) do
+ now = Util.now
+
+ job = Bj::Table::Job.find_first_and_lock(
+ :conditions => ["state = ? and submitted_at <= ?", "pending", now],
+ :order => "priority DESC, submitted_at ASC",
+ :limit => 1,
+ :lock => true)
+ throw :no_jobs unless job
+
+
+ Bj.logger.info{ "#{ job.title } - started" }
+
+ command = job.command
+ env = job.env || {}
+ stdin = job.stdin || ''
+ stdout = job.stdout || ''
+ stderr = job.stderr || ''
+ started_at = Util.now
+
+ thread = Util.start command, :cwd=>Bj.rails_root, :env=>env, :stdin=>stdin, :stdout=>stdout, :stderr=>stderr
+
+ job.state = "running"
+ job.runner = Bj.hostname
+ job.pid = thread.pid
+ job.started_at = started_at
+ job.save!
+ job.reload
+ end
+
+ exit_status = thread.value
+ finished_at = Util.now
+
+ Bj.transaction(options) do
+ job = Bj::Table::Job.find job.id
+ break unless job
+ job.state = "finished"
+ job.finished_at = finished_at
+ job.stdout = stdout
+ job.stderr = stderr
+ job.exit_status = exit_status
+ job.save!
+ job.reload
+ Bj.logger.info{ "#{ job.title } - exit_status=#{ job.exit_status }" }
+ end
+ end
+ end
+
+ Runner.hup_signaled false
+ wait.times do
+ break if Runner.hup_signaled?
+ break if Runner.kill_signaled?
+ sleep 1
+ end
+
+ break unless(limit or limit == false)
+ break if Runner.kill_signaled?
+ end
+ end
+
+ def ping_parent
+ ppid = options[:ppid]
+ return unless ppid
+ begin
+ Process.kill 0, Integer(ppid)
+ rescue Errno::ESRCH
+ Kernel.exit 42
+ rescue Exception
+ 42
+ end
+ end
+
+ def install_signal_handlers
+ Runner.hup_signaled false
+ hup_handler = nil
+ hup_handler =
+ trap Runner.hup_signal do |*a|
+ begin
+ Runner.hup_signaled true
+ rescue Exception => e
+ Bj.logger.error{ e } rescue nil
+ end
+ hup_handler.call *a rescue nil
+ end
+
+ Runner.kill_signaled false
+ kill_handler = nil
+ kill_handler =
+ trap Runner.kill_signal do |*a|
+ begin
+ Runner.kill_signaled true
+ rescue Exception => e
+ Bj.logger.error{ e } rescue nil
+ end
+ kill_handler.call *a rescue nil
+ end
+
+ begin
+ trap("INT"){ exit }
+ rescue Exception
+ end
+ end
+
+ def fill_morgue
+ Bj.transaction do
+ now = Util.now
+ jobs = Bj::Table::Job.find :all,
+ :conditions => ["state = 'running' and runner = ?", Bj.hostname]
+ jobs.each do |job|
+ if job.is_restartable?
+ Bj.logger.info{ "#{ job.title } - found dead and bloated but resubmitted" }
+ %w[ runner pid started_at finished_at stdout stderr exit_status ].each do |column|
+ job[column] = nil
+ end
+ job.state = 'pending'
+ else
+ Bj.logger.info{ "#{ job.title } - found dead and bloated" }
+ job.state = 'dead'
+ job.finished_at = now
+ end
+ job.save!
+ end
+ end
+ end
+
+ def archive_jobs
+ Bj.transaction do
+ now = Util.now
+ too_old = now - Bj.ttl
+ jobs = Bj::Table::Job.find :all,
+ :conditions => ["(state = 'finished' or state = 'dead') and submitted_at < ?", too_old]
+ jobs.each do |job|
+ Bj.logger.info{ "#{ job.title } - archived" }
+ hash = job.to_hash.update(:archived_at => now)
+ Bj::Table::JobArchive.create! hash
+ job.destroy
+ end
+ end
+ end
+
+ def register
+ Bj.transaction do
+ pid = Bj.config[key]
+ return false if Util.alive?(pid)
+ Bj.config[key] = Process.pid
+ unless Bj.util.ipc_signals_supported? # not winblows
+ require "drb"
+ DRb.start_service "druby://localhost:0", Process
+ Bj.config["#{ Process.pid }.uri"] = DRb.uri
+ end
+ end
+ at_exit{ unregister }
+ true
+ rescue Exception
+ false
+ end
+
+ def unregister
+ Bj.transaction do
+ Bj.config.delete key
+ end
+ true
+ rescue Exception
+ false
+ end
+
+ def key
+ @key ||= ( options[:ppid] ? Runner.key(options[:ppid]) : Runner.key )
+ end
+ end
+ send :include, Instance_Methods
+ end
+end