app/models/process/naf/runner.rb in naf-1.1.4 vs app/models/process/naf/runner.rb in naf-2.0.0
- old
+ new
@@ -1,10 +1,13 @@
require 'timeout'
module Process::Naf
class Runner < ::Af::Application
+ attr_accessor :machine,
+ :current_invocation
+
#----------------
# *** Options ***
#+++++++++++++++++
opt :wait_time_for_processes_to_terminate,
@@ -39,10 +42,13 @@
"don't modify ruby GC parameters",
default: false
opt :kill_all_runners,
"don't wait for runners to wind down and finish running their jobs",
default: false
+ opt :invocation_uuid,
+ "unique identifer used for runner logs",
+ default: `uuidgen`
def initialize
super
opt :log_configuration_files, default: ["af.yml",
"af-#{Rails.env}.yml",
@@ -54,146 +60,221 @@
"#{af_name}-#{Rails.env}.yml"]
@last_machine_log_level = nil
end
def work
- unless @disable_gc_modifications
- # These configuration changes will help forked processes, not the runner
- ENV['RUBY_HEAP_MIN_SLOTS'] = '500000'
- ENV['RUBY_HEAP_SLOTS_INCREMENT'] = '250000'
- ENV['RUBY_HEAP_SLOTS_GROWTH_FACTOR'] = '1'
- ENV['RUBY_GC_MALLOC_LIMIT'] = '50000000'
- end
+ check_gc_configurations
- machine = ::Naf::Machine.find_by_server_address(@server_address)
+ @machine = ::Naf::Machine.find_by_server_address(@server_address)
unless machine.present?
logger.fatal "This machine is not configued correctly (ipaddress: #{@server_address})."
logger.fatal "Please update #{::Naf::Machine.table_name} with an entry for this machine."
logger.fatal "Exiting..."
exit 1
end
machine.lock_for_runner_use
begin
- # Wind down other runners
- machine.machine_runners.each do |machine_runner|
- machine_runner.machine_runner_invocations.each do |invocation|
- if invocation.dead_at.blank?
- begin
- retval = Process.kill(0, invocation.pid)
- logger.detail "#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down"
- invocation.wind_down_at = Time.zone.now
- invocation.save!
- rescue Errno::ESRCH
- logger.detail "ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running"
- invocation.dead_at = Time.zone.now
- invocation.save!
- end
- end
- end
- end
+ cleanup_old_processes
+ wind_down_runners
+
# Create a machine runner, if it doesn't exist
machine_runner = ::Naf::MachineRunner.
find_or_create_by_machine_id_and_runner_cwd(machine_id: machine.id,
runner_cwd: Dir.pwd)
-
- begin
- repository_name = (`git remote -v`).slice(/:\S+/).sub('.git','')[1..-1]
- if repository_name.match(/fatal/)
- repository_name = nil
- end
- rescue
- repository_name = nil
- end
- branch_name = (`git rev-parse --abbrev-ref HEAD`).strip
- if branch_name.match(/fatal/)
- branch_name = nil
- end
- commit_information = (`git log --pretty="%H" -n 1`).strip
- if commit_information.match(/fatal/)
- commit_information = nil
- end
- deployment_tag = (`git describe --abbrev=0 --tag 2>&1`).strip
- if deployment_tag.match(/fatal: No names found, cannot describe anything/)
- deployment_tag = nil
- end
# Create an invocation for this runner
- invocation = ::Naf::MachineRunnerInvocation.create!(machine_runner_id: machine_runner.id,
- pid: Process.pid,
- repository_name: repository_name,
- branch_name: branch_name,
- commit_information: commit_information,
- deployment_tag: deployment_tag)
+ @current_invocation = ::Naf::MachineRunnerInvocation.
+ create!({ machine_runner_id: machine_runner.id,
+ pid: Process.pid,
+ uuid: @invocation_uuid }.merge!(retrieve_invocation_information))
ensure
machine.unlock_for_runner_use
end
begin
- work_machine(machine, invocation)
+ work_machine
ensure
- invocation.dead_at = Time.zone.now
- invocation.save!
+ @current_invocation.dead_at = Time.zone.now
+ @current_invocation.save!
+ cleanup_old_processes
end
end
- def work_machine(machine, invocation)
+ def check_gc_configurations
+ unless @disable_gc_modifications
+ # These configuration changes will help forked processes, not the runner
+ ENV['RUBY_HEAP_MIN_SLOTS'] = '500000'
+ ENV['RUBY_HEAP_SLOTS_INCREMENT'] = '250000'
+ ENV['RUBY_HEAP_SLOTS_GROWTH_FACTOR'] = '1'
+ ENV['RUBY_GC_MALLOC_LIMIT'] = '50000000'
+ end
+ end
+
+ def cleanup_old_processes
+ machine.machine_runners.each do |runner|
+ runner.machine_runner_invocations.recently_marked_dead(24.hours).each do |invocation|
+ terminate_old_processes(invocation)
+ end
+ end
+ end
+
+ def wind_down_runners
+ machine.machine_runners.each do |runner|
+ runner.machine_runner_invocations.each do |invocation|
+ if invocation.dead_at.blank?
+ begin
+ retval = Process.kill(0, invocation.pid)
+ logger.detail "#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down"
+ invocation.wind_down_at = Time.zone.now
+ invocation.save!
+ rescue Errno::ESRCH
+ logger.detail "ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running"
+ invocation.dead_at = Time.zone.now
+ invocation.save!
+ terminate_old_processes(invocation)
+ end
+ end
+ end
+ end
+ end
+
+ def retrieve_invocation_information
+ begin
+ repository_name = (`git remote -v`).slice(/:\S+/).sub('.git','')[1..-1]
+ if repository_name.match(/fatal/)
+ repository_name = nil
+ end
+ rescue
+ repository_name = nil
+ end
+ branch_name = (`git rev-parse --abbrev-ref HEAD`).strip
+ if branch_name.match(/fatal/)
+ branch_name = nil
+ end
+ commit_information = (`git log --pretty="%H" -n 1`).strip
+ if commit_information.match(/fatal/)
+ commit_information = nil
+ end
+ deployment_tag = (`git describe --abbrev=0 --tag 2>&1`).strip
+ if deployment_tag.match(/fatal: No names found, cannot describe anything/)
+ deployment_tag = nil
+ end
+
+ {
+ repository_name: repository_name,
+ branch_name: branch_name,
+ commit_information: commit_information,
+ deployment_tag: deployment_tag
+ }
+ end
+
+ def work_machine
machine.mark_alive
machine.mark_up
# Make sure no processes are thought to be running on this machine
terminate_old_processes(machine) if @kill_all_runners
- logger.info "working: #{machine}"
+ logger.info escape_html("working: #{machine}")
@children = {}
+ @threads = {}
at_exit {
::Af::Application.singleton.emergency_teardown
}
@job_fetcher = ::Logical::Naf::JobFetcher.new(machine)
while true
- break unless work_machine_loop(machine, invocation)
+ break unless work_machine_loop
GC.start
end
logger.info "runner quitting"
end
- def work_machine_loop(machine, invocation)
+ def work_machine_loop
machine.reload
# Check machine status
if !machine.enabled
- logger.warn "this machine is disabled #{machine}"
+ logger.warn escape_html("this machine is disabled #{machine}")
return false
elsif machine.marked_down
- logger.warn "this machine is marked down #{machine}"
+ logger.warn escape_html("this machine is marked down #{machine}")
return false
end
machine.mark_alive
+ check_log_level
+
+ @current_invocation.reload
+ if current_invocation.wind_down_at.present?
+ logger.warn "invocation asked to wind down"
+ if @children.length == 0
+ return false;
+ end
+ else
+ check_schedules
+ start_new_jobs
+ end
+
+ cleanup_dead_children
+
+ return true
+ end
+
+ def check_log_level
if machine.log_level != @last_machine_log_level
@last_machine_log_level = machine.log_level
unless @last_machine_log_level.blank?
logging_configurator.parse_and_set_logger_levels(@last_machine_log_level)
end
end
+ end
- invocation.reload
- if invocation.wind_down_at.present?
- logger.warn "invocation asked to wind down"
- if @children.length == 0
- return false;
+ def check_schedules
+ if ::Naf::Machine.is_it_time_to_check_schedules?(@check_schedules_period.minutes)
+ logger.debug "it's time to check schedules"
+ if ::Naf::ApplicationSchedule.try_lock_schedules
+ logger.debug_gross "checking schedules"
+ machine.mark_checked_schedule
+ ::Naf::ApplicationSchedule.unlock_schedules
+
+ # check scheduled tasks
+ should_be_queued.each do |application_schedule|
+ logger.info escape_html("scheduled application: #{application_schedule}")
+ begin
+ naf_boss = ::Logical::Naf::ConstructionZone::Boss.new
+ # this doesn't work very well for run_group_limits in the thousands
+ Range.new(0, application_schedule.application_run_group_limit || 1, true).each do
+ naf_boss.enqueue_application_schedule(application_schedule)
+ end
+ rescue ::Naf::HistoricalJob::JobPrerequisiteLoop => jpl
+ logger.error escape_html("#{machine} couldn't queue schedule because of prerequisite loop: #{jpl.message}")
+ logger.warn jpl
+ application_schedule.enabled = false
+ application_schedule.save!
+ logger.alarm escape_html("Application Schedule disabled due to loop: #{application_schedule}")
+ end
+ end
+
+ # check the runner machines
+ ::Naf::Machine.enabled.up.each do |runner_to_check|
+ if runner_to_check.is_stale?(@runner_stale_period.minutes)
+ logger.alarm escape_html("runner is stale for #{@runner_stale_period} minutes, #{runner_to_check}")
+ runner_to_check.mark_machine_down(machine)
+ end
+ end
end
end
+ end
- check_schedules(machine) if invocation.wind_down_at.blank?
-
+ def cleanup_dead_children
# clean up children that have exited
logger.detail "cleaning up dead children: #{@children.length}"
if @children.length > 0
while @children.length > 0
@@ -202,58 +283,26 @@
begin
Timeout::timeout(@loop_sleep_time) do
pid, status = Process.waitpid2(-1)
end
rescue Timeout::Error
- # XXX is there a race condition where a child process exits
- # XXX has not set pid or status yet and timeout fires?
- # XXX i bet there is
- # XXX so this code is here:
- dead_children = []
- @children.each do |pid, child|
- unless is_job_process_alive?(child)
- dead_children << child
- end
- end
-
- unless dead_children.blank?
- logger.error "#{machine}: dead children even with timeout during waitpid2(): #{dead_children.inspect}"
- logger.warn "this isn't necessarily incorrect -- look for the pids to be cleaned up next round, if not: call it a bug"
- end
-
+ check_dead_children_not_exited_properly
break
rescue Errno::ECHILD => e
- logger.error "#{machine} No child when we thought we had children #{@children.inspect}"
+ logger.error escape_html("#{machine} No child when we thought we had children #{@children.inspect}")
logger.warn e
pid = @children.first.try(:first)
status = nil
logger.warn "pulling first child off list to clean it up: pid=#{pid}"
end
if pid
begin
- child_job = @children.delete(pid)
-
- if child_job.present?
- # Update job tags
- child_job.historical_job.remove_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:work]])
-
- if status.nil? || status.exited? || status.signaled?
- logger.info { "cleaning up dead child: #{child_job.reload}" }
- finish_job(child_job,
- { exit_status: (status && status.exitstatus), termination_signal: (status && status.termsig) })
- else
- # this can happen if the child is sigstopped
- logger.warn "child waited for did not exit: #{child_job}, status: #{status.inspect}"
- end
- else
- # XXX ERROR no child for returned pid -- this can't happen
- logger.warn "child pid: #{pid}, status: #{status.inspect}, not managed by this runner"
- end
+ cleanup_dead_child(pid, status)
rescue ActiveRecord::ActiveRecordError => are
- logger.error "Failure during cleaning up of dead child with pid: #{pid}"
- logger.error "#{are.message}"
+ logger.error escape_html("Failure during cleaning up of dead child with pid: #{pid}, status: #{status}")
+ logger.error escape_html("#{are.message}")
rescue StandardError => e
# XXX just incase a job control failure -- more code here
logger.error "some failure during child clean up"
logger.warn e
end
@@ -261,93 +310,158 @@
end
else
logger.detail "sleeping in loop: #{@loop_sleep_time} seconds"
sleep(@loop_sleep_time)
end
+ end
+ # XXX is there a race condition where a child process exits
+ # XXX has not set pid or status yet and timeout fires?
+ # XXX i bet there is
+ # XXX so this code is here:
+ def check_dead_children_not_exited_properly
+ dead_children = []
+ @children.each do |pid, child|
+ unless is_job_process_alive?(child.reload)
+ dead_children << child
+ end
+ end
+
+ unless dead_children.blank?
+ logger.error escape_html("#{machine}: dead children even with timeout during waitpid2(): #{dead_children.inspect}")
+ logger.warn "this isn't necessarily incorrect -- look for the pids to be cleaned up next round, if not: call it a bug"
+ end
+ end
+
+ def cleanup_dead_child(pid, status)
+ child_job = @children.delete(pid)
+
+ if child_job.present?
+ # Update job tags
+ child_job.remove_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:work]])
+
+ if status.nil? || status.exited? || status.signaled?
+ logger.info { escape_html("cleaning up dead child: #{child_job.reload}") }
+ finish_job(child_job,
+ { exit_status: (status && status.exitstatus), termination_signal: (status && status.termsig) })
+
+ thread = @threads.delete(pid)
+ logger.detail escape_html("cleaning up threads: #{thread.inspect}")
+ logger.detail escape_html("thread list: #{Thread.list}")
+ thread.join
+ else
+ # this can happen if the child is sigstopped
+ logger.warn escape_html("child waited for did not exit: #{child_job}, status: #{status.inspect}")
+ end
+ else
+ # XXX ERROR no child for returned pid -- this can't happen
+ logger.warn "child pid: #{pid}, status: #{status.inspect}, not managed by this runner"
+ end
+ end
+
+ def start_new_jobs
# start new jobs
logger.detail "starting new jobs, num children: #{@children.length}/#{machine.thread_pool_size}"
- # XXX while @children.length < machine.thread_pool_size && memory_available_to_spawn? && invocation.wind_down_at.blank?
- while ::Naf::RunningJob.where(:started_on_machine_id => machine.id).count < machine.thread_pool_size &&
- memory_available_to_spawn? && invocation.wind_down_at.blank?
+ # XXX while @children.length < machine.thread_pool_size && memory_available_to_spawn? && current_invocation.wind_down_at.blank?
+ while ::Naf::RunningJob.where(started_on_machine_id: machine.id).count < machine.thread_pool_size &&
+ memory_available_to_spawn? && current_invocation.wind_down_at.blank?
logger.debug_gross "fetching jobs because: children: #{@children.length} < #{machine.thread_pool_size} (poolsize)"
begin
running_job = @job_fetcher.fetch_next_job
unless running_job.present?
logger.debug_gross "no more jobs to run"
break
end
- logger.info "starting new job : #{running_job}"
+ logger.info escape_html("starting new job : #{running_job.inspect}")
- pid = running_job.historical_job.spawn
+ # fork and run
+ pid, stdin, stdout, stderr = running_job.historical_job.spawn
+ stdin.close
+
+ # Reset NAF_JOB_ID
+ ENV.delete('NAF_JOB_ID')
if pid
@children[pid] = running_job
running_job.pid = pid
running_job.historical_job.pid = pid
running_job.historical_job.failed_to_start = false
- running_job.historical_job.machine_runner_invocation_id = invocation.id
- logger.info "job started : #{running_job}"
+ running_job.historical_job.machine_runner_invocation_id = current_invocation.id
+ logger.info escape_html("job started : #{running_job}")
running_job.save!
running_job.historical_job.save!
+
+ # Spawn a thread to output the log of each job to files.
+ #
+ # Make sure not to execute any database calls inside this
+ # block, as it will start an ActiveRecord connection for each
+ # thread and eventually raise a ConnetionTimeoutError, resulting
+ # the runner to exit.
+ thread = Thread.new do
+ log_output_until_job_finishes(running_job.id, stdout, stderr)
+ end
+ @threads[pid] = thread
else
# should never get here (well, hopefully)
- logger.error "#{machine}: failed to execute #{running_job}"
+ logger.error escape_html("#{machine}: failed to execute #{running_job}")
finish_job(running_job, { failed_to_start: true })
end
rescue ActiveRecord::ActiveRecordError => are
raise
rescue StandardError => e
# XXX rescue for various issues
- logger.error "#{machine}: failure during job start"
+ logger.error escape_html("#{machine}: failure during job start")
logger.warn e
end
end
logger.debug_gross "done starting jobs"
+ end
- return true
+ def log_output_until_job_finishes(job_id, stdout, stderr)
+ log_file = ::Logical::Naf::LogFile.new("#{::Naf::PREFIX_PATH}/#{::Naf.schema_name}/jobs/#{job_id}")
+ log_file.open
- end
+ # Continue reading logs from stdout/stderror until it reaches end of file
+ while true
+ read_pipes = []
+ read_pipes << stdout if stdout
+ read_pipes << stderr if stderr
+ return if (read_pipes.length == 0)
- def check_schedules(machine)
- if ::Naf::Machine.is_it_time_to_check_schedules?(@check_schedules_period.minutes)
- logger.debug "it's time to check schedules"
- if ::Naf::ApplicationSchedule.try_lock_schedules
- logger.debug_gross "checking schedules"
- machine.mark_checked_schedule
- ::Naf::ApplicationSchedule.unlock_schedules
+ error_pipes = read_pipes.clone
+ read_array, write_array, error_array = Kernel.select(read_pipes, nil, error_pipes, 1)
- # check scheduled tasks
- should_be_queued(machine).each do |application_schedule|
- logger.info "scheduled application: #{application_schedule}"
- begin
- naf_boss = ::Logical::Naf::ConstructionZone::Boss.new
- # this doesn't work very well for run_group_limits in the thousands
- Range.new(0, application_schedule.application_run_group_limit || 1, true).each do
- naf_boss.enqueue_application_schedule(application_schedule)
+ unless error_array.blank?
+ logger.error escape_html("job(#{job_id}): select returned error for #{error_pipes.inspect} (read_pipes: #{read_pipes.inspect})")
+ # XXX we should probably close the errored FDs
+ end
+
+ unless read_array.blank?
+ begin
+ for r in read_array do
+ begin
+ # Parse each log line into JSON
+ r.read_nonblock(10240).split("\n").each do |log|
+ log_file << log.rstrip
+ end
+ rescue Errno::EAGAIN
+ rescue Errno::EINTR
+ rescue EOFError => eof
+ stdout = nil if r == stdout
+ stderr = nil if r == stderr
end
- rescue ::Naf::HistoricalJob::JobPrerequisiteLoop => jpl
- logger.error "#{machine} couldn't queue schedule because of prerequisite loop: #{jpl.message}"
- logger.warn jpl
- application_schedule.enabled = false
- application_schedule.save!
- logger.alarm "Application Schedule disabled due to loop: #{application_schedule}"
end
+ ensure
+ log_file.write
end
-
- # check the runner machines
- ::Naf::Machine.enabled.up.each do |runner_to_check|
- if runner_to_check.is_stale?(@runner_stale_period.minutes)
- logger.alarm "runner is stale for #{@runner_stale_period} minutes, #{runner_to_check}"
- runner_to_check.mark_machine_down(machine)
- end
- end
end
end
+
+ log_file.close
end
# XXX update_all doesn't support "from_partition" so we have this helper
def update_historical_job(updates, historical_job_id)
updates[:updated_at] = Time.zone.now
@@ -362,19 +476,19 @@
SQL
::Naf::HistoricalJob.find_by_sql([update_sql] + updates.values + [historical_job_id])
end
def finish_job(running_job, updates = {})
- running_job.historical_job.remove_all_tags
- running_job.historical_job.add_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:cleanup]])
+ if running_job.present?
+ running_job.remove_all_tags
+ running_job.add_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:cleanup]])
+ end
::Naf::HistoricalJob.transaction do
update_historical_job(updates.merge({ finished_at: Time.zone.now }), running_job.id)
running_job.delete
end
-
- running_job.historical_job.remove_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:cleanup]])
end
# kill(0, pid) seems to fail during at_exit block
# so this shoots from the hip
def emergency_teardown
@@ -390,59 +504,59 @@
# force job down
finish_job(child)
end
end
- def terminate_old_processes(machine)
+ def terminate_old_processes(record)
# check if any processes are hanging around and ask them
# politely if they will please terminate
- jobs = assigned_jobs(machine)
+ jobs = assigned_jobs(record)
if jobs.length == 0
logger.detail "no jobs to remove"
return
end
logger.info "number of old jobs to sift through: #{jobs.length}"
jobs.each do |job|
- logger.detail "job still around: #{job}"
+ logger.detail escape_html("job still around: #{job}")
if job.request_to_terminate == false
logger.warn "politely asking process: #{job.pid} to terminate itself"
job.request_to_terminate = true
job.save!
end
end
# wait
(1..@wait_time_for_processes_to_terminate).each do |i|
- num_assigned_jobs = assigned_jobs(machine).length
+ num_assigned_jobs = assigned_jobs(record).length
return if num_assigned_jobs == 0
logger.debug_medium "#{i}/#{@wait_time_for_processes_to_terminate}: sleeping 1 second while we wait for " +
"#{num_assigned_jobs} assigned job(s) to terminate as requested"
sleep(1)
end
# nudge them to terminate
- jobs = assigned_jobs(machine)
+ jobs = assigned_jobs(record)
if jobs.length == 0
logger.debug_gross "assigned jobs have exited after asking to terminate nicely"
return
end
jobs.each do |job|
- logger.warn "sending SIG_TERM to process: #{job}"
+ logger.warn escape_html("sending SIG_TERM to process: #{job}")
send_signal_and_maybe_clean_up(job, "TERM")
end
# wait
(1..5).each do |i|
- num_assigned_jobs = assigned_jobs(machine).length
+ num_assigned_jobs = assigned_jobs(record).length
return if num_assigned_jobs == 0
logger.debug_medium "#{i}/5: sleeping 1 second while we wait for #{num_assigned_jobs} assigned job(s) to terminate from SIG_TERM"
sleep(1)
end
# kill with fire
- assigned_jobs(machine).each do |job|
- logger.alarm "sending SIG_KILL to process: #{job}"
+ assigned_jobs(record).each do |job|
+ logger.alarm escape_html("sending SIG_KILL to process: #{job}")
send_signal_and_maybe_clean_up(job, "KILL")
# job force job down
finish_job(job)
end
@@ -471,17 +585,23 @@
def is_job_process_alive?(job)
return send_signal_and_maybe_clean_up(job, 0)
end
- def assigned_jobs(machine)
- return ::Naf::RunningJob.assigned_jobs(machine).select do |job|
- is_job_process_alive?(job)
+ def assigned_jobs(record)
+ if record.kind_of? ::Naf::MachineRunnerInvocation
+ return ::Naf::RunningJob.started_on_invocation(record.id).select do |job|
+ is_job_process_alive?(job)
+ end
+ else
+ return ::Naf::RunningJob.assigned_jobs(record).select do |job|
+ is_job_process_alive?(job)
+ end
end
end
- def should_be_queued(machine)
+ def should_be_queued
not_finished_applications = ::Naf::HistoricalJob.
queued_between(Time.zone.now - Naf::HistoricalJob::JOB_STALE_TIME, Time.zone.now).
where("finished_at IS NULL AND request_to_terminate = false").
find_all{ |job| job.application_id.present? }.
index_by{ |job| job.application_id }
@@ -532,9 +652,13 @@
return true
end
logger.alarm "#{Facter.hostname}.#{Facter.domain}: not enough memory to spawn: #{memory_free_percentage}% (free) < #{@minimum_memory_free}% (min percent)"
return false
+ end
+
+ def escape_html(str)
+ CGI::escapeHTML(str)
end
end
end