app/models/process/naf/runner.rb in naf-2.1.10 vs app/models/process/naf/runner.rb in naf-2.1.11
- old
+ new
@@ -2,11 +2,12 @@
module Process::Naf
class Runner < ::Af::Application
attr_accessor :machine,
- :current_invocation
+ :current_invocation,
+ :last_cleaned_up_processes
#----------------
# *** Options ***
#+++++++++++++++++
@@ -62,18 +63,17 @@
check_gc_configurations
@machine = ::Naf::Machine.find_by_server_address(@server_address)
unless machine.present?
- logger.fatal "This machine is not configued correctly (ipaddress: #{@server_address})."
- logger.fatal "Please update #{::Naf::Machine.table_name} with an entry for this machine."
+ logger.fatal escape_html("This machine is not configued correctly (ipaddress: #{@server_address}).")
+ logger.fatal escape_html("Please update #{::Naf::Machine.table_name} with an entry for this machine.")
logger.fatal "Exiting..."
exit 1
end
- machine.lock_for_runner_use
- begin
+ machine.lock_for_runner_use do
cleanup_old_processes
remove_invalid_running_jobs
wind_down_runners
# Create a machine runner, if it doesn't exist
@@ -83,12 +83,10 @@
# Create an invocation for this runner
@current_invocation = ::Naf::MachineRunnerInvocation.
create!({ machine_runner_id: machine_runner.id,
pid: Process.pid,
uuid: @invocation_uuid }.merge!(retrieve_invocation_information))
- ensure
- machine.unlock_for_runner_use
end
begin
work_machine
ensure
@@ -97,30 +95,35 @@
cleanup_old_processes
end
end
def remove_invalid_running_jobs
+ logger.debug "looking for invalid running jobs"
::Naf::RunningJob.
joins("INNER JOIN #{Naf.schema_name}.historical_jobs AS hj ON hj.id = #{Naf.schema_name}.running_jobs.id").
where('finished_at IS NOT NULL AND hj.started_on_machine_id = ?', @machine.id).readonly(false).each do |job|
+ logger.debug escape_html("removing invalid job #{job.inspect}")
job.delete
end
end
def check_gc_configurations
+ logger.debug "checking garbage collection configurations"
unless @disable_gc_modifications
# These configuration changes will help forked processes, not the runner
ENV['RUBY_HEAP_MIN_SLOTS'] = '500000'
ENV['RUBY_HEAP_SLOTS_INCREMENT'] = '250000'
ENV['RUBY_HEAP_SLOTS_GROWTH_FACTOR'] = '1'
ENV['RUBY_GC_MALLOC_LIMIT'] = '50000000'
end
end
- def cleanup_old_processes
- machine.machine_runners.each do |runner|
- runner.machine_runner_invocations.recently_marked_dead(24.hours).each do |invocation|
+ def cleanup_old_processes(created_at_interval = 1.month, marked_dead_interval = 24.hours)
+ @last_cleaned_up_processes = Time.zone.now
+ logger.debug "cleaning up old processes"
+ ::Naf::MachineRunner.where("created_at >= ?", Time.zone.now - created_at_interval).each do |runner|
+ runner.machine_runner_invocations.recently_marked_dead(marked_dead_interval).each do |invocation|
terminate_old_processes(invocation)
end
end
end
@@ -128,15 +131,15 @@
machine.machine_runners.each do |runner|
runner.machine_runner_invocations.each do |invocation|
if invocation.dead_at.blank?
begin
retval = Process.kill(0, invocation.pid)
- logger.detail "#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down"
+ logger.detail escape_html("#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down")
invocation.wind_down_at = Time.zone.now
invocation.save!
rescue Errno::ESRCH
- logger.detail "ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running"
+ logger.detail escape_html("ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running")
invocation.dead_at = Time.zone.now
invocation.save!
terminate_old_processes(invocation)
end
end
@@ -209,10 +212,11 @@
elsif machine.marked_down
logger.warn escape_html("this machine is marked down #{machine}")
return false
end
+ logger.debug "marking machine alive"
machine.mark_alive
check_log_level
@current_invocation.reload
@@ -225,10 +229,11 @@
check_schedules
start_new_jobs
end
cleanup_dead_children
+ cleanup_old_processes(1.week, 75.minutes) if (Time.zone.now - @last_cleaned_up_processes) > 1.hour
return true
end
def check_log_level
@@ -296,11 +301,11 @@
rescue Errno::ECHILD => e
logger.error escape_html("#{machine} No child when we thought we had children #{@children.inspect}")
logger.warn e
pid = @children.first.try(:first)
status = nil
- logger.warn "pulling first child off list to clean it up: pid=#{pid}"
+ logger.warn escape_html("pulling first child off list to clean it up: pid=#{pid}")
end
if pid
begin
cleanup_dead_child(pid, status)
@@ -325,11 +330,11 @@
# XXX i bet there is
# XXX so this code is here:
def check_dead_children_not_exited_properly
dead_children = []
@children.each do |pid, child|
- unless is_job_process_alive?(child.reload)
+ unless is_job_process_alive?(child)
dead_children << child
end
end
unless dead_children.blank?
@@ -344,31 +349,29 @@
if child_job.present?
# Update job tags
child_job.remove_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:work]])
if status.nil? || status.exited? || status.signaled?
- logger.info { escape_html("cleaning up dead child: #{child_job.reload}") }
+ logger.info { escape_html("cleaning up dead child: #{child_job.inspect}") }
finish_job(child_job,
{ exit_status: (status && status.exitstatus), termination_signal: (status && status.termsig) })
else
# this can happen if the child is sigstopped
- logger.warn escape_html("child waited for did not exit: #{child_job}, status: #{status.inspect}")
+ logger.warn escape_html("child waited for did not exit: #{child_job.inspect}, status: #{status.inspect}")
end
else
# XXX ERROR no child for returned pid -- this can't happen
- logger.warn "child pid: #{pid}, status: #{status.inspect}, not managed by this runner"
+ logger.warn escape_html("child pid: #{pid}, status: #{status.inspect}, not managed by this runner")
end
end
def start_new_jobs
- # start new jobs
- logger.detail "starting new jobs, num children: #{@children.length}/#{machine.thread_pool_size}"
- # XXX while @children.length < machine.thread_pool_size && memory_available_to_spawn? && current_invocation.wind_down_at.blank?
+ logger.detail escape_html("starting new jobs, num children: #{@children.length}/#{machine.thread_pool_size}")
while ::Naf::RunningJob.where(started_on_machine_id: machine.id).count < machine.thread_pool_size &&
memory_available_to_spawn? && current_invocation.wind_down_at.blank?
- logger.debug_gross "fetching jobs because: children: #{@children.length} < #{machine.thread_pool_size} (poolsize)"
+ logger.debug_gross escape_html("fetching jobs because: children: #{@children.length} < #{machine.thread_pool_size} (poolsize)")
begin
running_job = @job_fetcher.fetch_next_job
unless running_job.present?
logger.debug_gross "no more jobs to run"
@@ -382,16 +385,16 @@
@children[pid] = running_job
running_job.pid = pid
running_job.historical_job.pid = pid
running_job.historical_job.failed_to_start = false
running_job.historical_job.machine_runner_invocation_id = current_invocation.id
- logger.info escape_html("job started : #{running_job}")
running_job.save!
running_job.historical_job.save!
+ logger.info escape_html("job started : #{running_job.inspect}")
else
# should never get here (well, hopefully)
- logger.error escape_html("#{machine}: failed to execute #{running_job}")
+ logger.error escape_html("#{machine}: failed to execute #{running_job.inspect}")
finish_job(running_job, { failed_to_start: true })
end
rescue ActiveRecord::ActiveRecordError => are
raise
@@ -402,11 +405,11 @@
end
end
logger.debug_gross "done starting jobs"
end
- # XXX update_all doesn't support "from_partition" so we have this helper
+ # update_all doesn't support "from_partition" so we have this helper
def update_historical_job(updates, historical_job_id)
updates[:updated_at] = Time.zone.now
update_columns = updates.map{ |k,v| "#{k} = ?" }.join(", ")
update_sql = <<-SQL
UPDATE
@@ -418,30 +421,44 @@
SQL
::Naf::HistoricalJob.find_by_sql([update_sql] + updates.values + [historical_job_id])
end
def finish_job(running_job, updates = {})
- if running_job.present?
- running_job.remove_all_tags
- running_job.add_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:cleanup]])
+ # Check to see if running job still exists
+ job = ::Naf::RunningJob.find_by_id(running_job.id)
+ if job.present?
+ job.lock_for_runner_use do
+ ::Naf::HistoricalJob.transaction do
+ update_historical_job(updates.merge({ finished_at: Time.zone.now }), job.id)
+ job.delete
+ end
+ end
+ else
+ job = ::Naf::HistoricalJob.find_by_id(running_job.id)
+ # This does not seem to be need, but just for extra measure
+ if job.present?
+ job.lock_for_runner_use do
+ ::Naf::HistoricalJob.transaction do
+ update_historical_job(updates.merge({ finished_at: Time.zone.now }), job.id)
+ end
+ end
+ end
end
-
- ::Naf::HistoricalJob.transaction do
- update_historical_job(updates.merge({ finished_at: Time.zone.now }), running_job.id)
- running_job.delete
- end
end
# kill(0, pid) seems to fail during at_exit block
# so this shoots from the hip
def emergency_teardown
return if @children.length == 0
logger.warn "emergency teardown of #{@children.length} job(s)"
@children.clone.each do |pid, child|
send_signal_and_maybe_clean_up(child, "TERM")
end
+
+ # Wait 2 seconds
sleep(2)
+
@children.clone.each do |pid, child|
send_signal_and_maybe_clean_up(child, "KILL")
# force job down
finish_job(child)
@@ -454,13 +471,14 @@
jobs = assigned_jobs(record)
if jobs.length == 0
logger.detail "no jobs to remove"
return
end
+
logger.info "number of old jobs to sift through: #{jobs.length}"
jobs.each do |job|
- logger.detail escape_html("job still around: #{job}")
+ logger.detail escape_html("job still around: #{job.inspect}")
if job.request_to_terminate == false
logger.warn "politely asking process: #{job.pid} to terminate itself"
job.request_to_terminate = true
job.save!
end
@@ -480,11 +498,11 @@
if jobs.length == 0
logger.debug_gross "assigned jobs have exited after asking to terminate nicely"
return
end
jobs.each do |job|
- logger.warn escape_html("sending SIG_TERM to process: #{job}")
+ logger.warn escape_html("sending SIG_TERM to process: #{job.inspect}")
send_signal_and_maybe_clean_up(job, "TERM")
end
# wait
(1..5).each do |i|
@@ -494,11 +512,11 @@
sleep(1)
end
# kill with fire
assigned_jobs(record).each do |job|
- logger.alarm escape_html("sending SIG_KILL to process: #{job}")
+ logger.alarm escape_html("sending SIG_KILL to process: #{job.inspect}")
send_signal_and_maybe_clean_up(job, "KILL")
# job force job down
finish_job(job)
end
@@ -520,10 +538,11 @@
# job does not exist -- mark it finished
finish_job(job)
return false
end
+
return true
end
def is_job_process_alive?(job)
return send_signal_and_maybe_clean_up(job, 0)
@@ -543,13 +562,26 @@
def memory_available_to_spawn?
Facter.clear
memory_size = Facter.memorysize_mb.to_f
memory_free = Facter.memoryfree_mb.to_f
+ memory_free_percentage = ((memory_free + sreclaimable_memory) / memory_size) * 100.0
- # Linux breaks out kernel cache-use memory into an SReclaimable stat
- # in /proc/meminfo which should be counted as free, but facter does not.
+ if (memory_free_percentage >= @minimum_memory_free)
+ logger.detail "memory available: #{memory_free_percentage}% (free) >= " +
+ "#{@minimum_memory_free}% (min percent)"
+ return true
+ end
+ logger.alarm "#{Facter.hostname}.#{Facter.domain}: not enough memory to spawn: " +
+ "#{memory_free_percentage}% (free) < #{@minimum_memory_free}% (min percent)"
+
+ return false
+ end
+
+ # Linux breaks out kernel cache-use memory into an SReclaimable stat
+ # in /proc/meminfo which should be counted as free, but facter does not.
+ def sreclaimable_memory
sreclaimable = 0.0
begin
File.readlines('/proc/meminfo').each do |l|
if l =~ /^(?:SReclaimable):\s+(\d+)\s+\S+/
# Convert the memory from Kilobytes to Gigabytes and
@@ -559,20 +591,10 @@
end
end
rescue
end
- memory_free_percentage = ((memory_free + sreclaimable) / memory_size) * 100.0
-
- if (memory_free_percentage >= @minimum_memory_free)
- logger.detail "memory available: #{memory_free_percentage}% (free) >= " +
- "#{@minimum_memory_free}% (min percent)"
- return true
- end
- logger.alarm "#{Facter.hostname}.#{Facter.domain}: not enough memory to spawn: " +
- "#{memory_free_percentage}% (free) < #{@minimum_memory_free}% (min percent)"
-
- return false
+ sreclaimable
end
def escape_html(str)
CGI::escapeHTML(str)
end