app/models/process/naf/runner.rb in naf-2.1.12 vs app/models/process/naf/runner.rb in naf-2.1.13
- old
+ new
@@ -55,20 +55,23 @@
"nafrunner.yml",
"nafrunner-#{Rails.env}.yml",
"#{af_name}.yml",
"#{af_name}-#{Rails.env}.yml"]
@last_machine_log_level = nil
+ @metric_send_delay = ::Naf.configuration.metric_send_delay
end
def work
check_gc_configurations
@machine = ::Naf::Machine.find_by_server_address(@server_address)
+ @metric_sender = ::Logical::Naf::MetricSender.new(@metric_send_delay, @machine)
+
unless machine.present?
- logger.fatal escape_html("This machine is not configued correctly (ipaddress: #{@server_address}).")
- logger.fatal escape_html("Please update #{::Naf::Machine.table_name} with an entry for this machine.")
+ logger.fatal "This machine is not configued correctly (ipaddress: #{@server_address})."
+ logger.fatal "Please update #{::Naf::Machine.table_name} with an entry for this machine."
logger.fatal "Exiting..."
exit 1
end
machine.lock_for_runner_use do
@@ -99,11 +102,11 @@
def remove_invalid_running_jobs
logger.debug "looking for invalid running jobs"
::Naf::RunningJob.
joins("INNER JOIN #{Naf.schema_name}.historical_jobs AS hj ON hj.id = #{Naf.schema_name}.running_jobs.id").
where('finished_at IS NOT NULL AND hj.started_on_machine_id = ?', @machine.id).readonly(false).each do |job|
- logger.debug escape_html("removing invalid job #{job.inspect}")
+ logger.debug "removing invalid job #{job.inspect}"
job.delete
end
end
def check_gc_configurations
@@ -131,15 +134,15 @@
machine.machine_runners.each do |runner|
runner.machine_runner_invocations.each do |invocation|
if invocation.dead_at.blank?
begin
retval = Process.kill(0, invocation.pid)
- logger.detail escape_html("#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down")
+ logger.detail "#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down"
invocation.wind_down_at = Time.zone.now
invocation.save!
rescue Errno::ESRCH
- logger.detail escape_html("ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running")
+ logger.detail "ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running"
invocation.dead_at = Time.zone.now
invocation.save!
terminate_old_processes(invocation)
end
end
@@ -182,11 +185,11 @@
machine.mark_up
# Make sure no processes are thought to be running on this machine
terminate_old_processes(machine) if @kill_all_runners
- logger.info escape_html("working: #{machine}")
+ logger.info "working: #{machine}"
@children = {}
at_exit {
::Af::Application.singleton.emergency_teardown
@@ -205,14 +208,14 @@
def work_machine_loop
machine.reload
# Check machine status
if !machine.enabled
- logger.warn escape_html("this machine is disabled #{machine}")
+ logger.warn "this machine is disabled #{machine}"
return false
elsif machine.marked_down
- logger.warn escape_html("this machine is marked down #{machine}")
+ logger.warn "this machine is marked down #{machine}"
return false
end
logger.debug "marking machine alive"
machine.mark_alive
@@ -228,56 +231,69 @@
else
check_schedules
start_new_jobs
end
+ send_metrics
+
cleanup_dead_children
cleanup_old_processes(1.week, 75.minutes) if (Time.zone.now - @last_cleaned_up_processes) > 1.hour
return true
end
+ def send_metrics
+ # Only send metrics if not winding down, or winding down and only runner.
+ logger.debug "checking whether it's time to send metrics"
+ @current_invocation.reload
+ if @current_invocation.wind_down_at.present?
+ return nil if @machine.machine_runners.running.count > 0
+ end
+ logger.debug "sending metrics"
+ @metric_sender.send_metrics
+ end
+
def check_log_level
if machine.log_level != @last_machine_log_level
@last_machine_log_level = machine.log_level
unless @last_machine_log_level.blank?
logging_configurator.parse_and_set_logger_levels(@last_machine_log_level)
end
end
end
def check_schedules
- logger.debug escape_html("last time schedules were checked: #{::Naf::Machine.last_time_schedules_were_checked}")
+ logger.debug "last time schedules were checked: #{::Naf::Machine.last_time_schedules_were_checked}"
if ::Naf::Machine.is_it_time_to_check_schedules?(@check_schedules_period.minutes)
logger.debug "it's time to check schedules"
if ::Naf::ApplicationSchedule.try_lock_schedules
logger.debug_gross "checking schedules"
machine.mark_checked_schedule
::Naf::ApplicationSchedule.unlock_schedules
# check scheduled tasks
::Naf::ApplicationSchedule.should_be_queued.each do |application_schedule|
- logger.info escape_html("scheduled application: #{application_schedule}")
+ logger.info "scheduled application: #{application_schedule}"
begin
naf_boss = ::Logical::Naf::ConstructionZone::Boss.new
# this doesn't work very well for run_group_limits in the thousands
Range.new(0, application_schedule.application_run_group_quantum || 1, true).each do
naf_boss.enqueue_application_schedule(application_schedule)
end
rescue ::Naf::HistoricalJob::JobPrerequisiteLoop => jpl
- logger.error escape_html("#{machine} couldn't queue schedule because of prerequisite loop: #{jpl.message}")
+ logger.error "#{machine} couldn't queue schedule because of prerequisite loop: #{jpl.message}"
logger.warn jpl
application_schedule.enabled = false
application_schedule.save!
- logger.alarm escape_html("Application Schedule disabled due to loop: #{application_schedule}")
+ logger.alarm "Application Schedule disabled due to loop: #{application_schedule}"
end
end
# check the runner machines
::Naf::Machine.enabled.up.each do |runner_to_check|
if runner_to_check.is_stale?(@runner_stale_period.minutes)
- logger.alarm escape_html("runner is stale for #{@runner_stale_period} minutes, #{runner_to_check}")
+ logger.alarm "runner is stale for #{@runner_stale_period} minutes, #{runner_to_check}"
runner_to_check.mark_machine_down(machine)
end
end
end
end
@@ -297,23 +313,23 @@
end
rescue Timeout::Error
check_dead_children_not_exited_properly
break
rescue Errno::ECHILD => e
- logger.error escape_html("#{machine} No child when we thought we had children #{@children.inspect}")
+ logger.error "#{machine} No child when we thought we had children #{@children.inspect}"
logger.warn e
pid = @children.first.try(:first)
status = nil
- logger.warn escape_html("pulling first child off list to clean it up: pid=#{pid}")
+ logger.warn "pulling first child off list to clean it up: pid=#{pid}"
end
if pid
begin
cleanup_dead_child(pid, status)
rescue ActiveRecord::ActiveRecordError => are
- logger.error escape_html("Failure during cleaning up of dead child with pid: #{pid}, status: #{status}")
- logger.error escape_html("#{are.message}")
+ logger.error "Failure during cleaning up of dead child with pid: #{pid}, status: #{status}"
+ logger.error "#{are.message}"
rescue StandardError => e
# XXX just incase a job control failure -- more code here
logger.error "some failure during child clean up"
logger.warn e
end
@@ -336,11 +352,11 @@
dead_children << child
end
end
unless dead_children.blank?
- logger.error escape_html("#{machine}: dead children even with timeout during waitpid2(): #{dead_children.inspect}")
+ logger.error "#{machine}: dead children even with timeout during waitpid2(): #{dead_children.inspect}"
logger.warn "this isn't necessarily incorrect -- look for the pids to be cleaned up next round, if not: call it a bug"
end
end
def cleanup_dead_child(pid, status)
@@ -349,60 +365,66 @@
if child_job.present?
# Update job tags
child_job.remove_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:work]])
if status.nil? || status.exited? || status.signaled?
- logger.info { escape_html("cleaning up dead child: #{child_job.inspect}") }
+ logger.info { "cleaning up dead child: #{child_job.inspect}" }
finish_job(child_job,
{ exit_status: (status && status.exitstatus), termination_signal: (status && status.termsig) })
+ if status && status.exitstatus > 0 && !child_job.request_to_terminate
+ @metric_sender.statsd.event("Naf Job Error",
+ "#{child_job.inspect} finished with non-zero exit status.",
+ alert_type: "error",
+ tags: (::Naf.configuration.metric_tags << "naf:joberror"))
+ end
else
# this can happen if the child is sigstopped
- logger.warn escape_html("child waited for did not exit: #{child_job.inspect}, status: #{status.inspect}")
+ logger.warn "child waited for did not exit: #{child_job.inspect}, status: #{status.inspect}"
end
else
# XXX ERROR no child for returned pid -- this can't happen
- logger.warn escape_html("child pid: #{pid}, status: #{status.inspect}, not managed by this runner")
+ logger.warn "child pid: #{pid}, status: #{status.inspect}, not managed by this runner"
end
end
def start_new_jobs
- logger.detail escape_html("starting new jobs, num children: #{@children.length}/#{machine.thread_pool_size}")
+ logger.detail "starting new jobs, num children: #{@children.length}/#{machine.thread_pool_size}"
while ::Naf::RunningJob.where(started_on_machine_id: machine.id).count < machine.thread_pool_size &&
memory_available_to_spawn? && current_invocation.wind_down_at.blank?
- logger.debug_gross escape_html("fetching jobs because: children: #{@children.length} < #{machine.thread_pool_size} (poolsize)")
+ logger.debug_gross "fetching jobs because: children: #{@children.length} < #{machine.thread_pool_size} (poolsize)"
begin
running_job = @job_fetcher.fetch_next_job
unless running_job.present?
logger.debug_gross "no more jobs to run"
break
end
- logger.info escape_html("starting new job : #{running_job.inspect}")
+ logger.info "starting new job : #{running_job.inspect}"
pid = running_job.historical_job.spawn
if pid.present?
@children[pid] = running_job
running_job.pid = pid
running_job.historical_job.pid = pid
running_job.historical_job.failed_to_start = false
running_job.historical_job.machine_runner_invocation_id = current_invocation.id
running_job.save!
running_job.historical_job.save!
- logger.info escape_html("job started : #{running_job.inspect}")
+ logger.info "job started : #{running_job.inspect}"
else
# should never get here (well, hopefully)
- logger.error escape_html("#{machine}: failed to execute #{running_job.inspect}")
+ logger.error "#{machine}: failed to execute #{running_job.inspect}"
finish_job(running_job, { failed_to_start: true })
end
rescue ActiveRecord::ActiveRecordError => are
raise
rescue StandardError => e
# XXX rescue for various issues
- logger.error escape_html("#{machine}: failure during job start")
+ logger.error "#{machine}: failure during job start"
logger.warn e
end
end
logger.debug_gross "done starting jobs"
end
@@ -474,11 +496,11 @@
return
end
logger.info "number of old jobs to sift through: #{jobs.length}"
jobs.each do |job|
- logger.detail escape_html("job still around: #{job.inspect}")
+ logger.detail "job still around: #{job.inspect}"
if job.request_to_terminate == false
logger.warn "politely asking process: #{job.pid} to terminate itself"
job.request_to_terminate = true
job.save!
end
@@ -498,11 +520,11 @@
if jobs.length == 0
logger.debug_gross "assigned jobs have exited after asking to terminate nicely"
return
end
jobs.each do |job|
- logger.warn escape_html("sending SIG_TERM to process: #{job.inspect}")
+ logger.warn "sending SIG_TERM to process: #{job.inspect}"
send_signal_and_maybe_clean_up(job, "TERM")
end
# wait
(1..5).each do |i|
@@ -512,11 +534,11 @@
sleep(1)
end
# kill with fire
assigned_jobs(record).each do |job|
- logger.alarm escape_html("sending SIG_KILL to process: #{job.inspect}")
+ logger.alarm "sending SIG_KILL to process: #{job.inspect}"
send_signal_and_maybe_clean_up(job, "KILL")
# job force job down
finish_job(job)
end
@@ -593,12 +615,7 @@
rescue
end
sreclaimable
end
-
- def escape_html(str)
- CGI::escapeHTML(str)
- end
-
end
end