app/models/process/naf/runner.rb in naf-2.1.12 vs app/models/process/naf/runner.rb in naf-2.1.13

- old
+ new

@@ -55,20 +55,23 @@ "nafrunner.yml", "nafrunner-#{Rails.env}.yml", "#{af_name}.yml", "#{af_name}-#{Rails.env}.yml"] @last_machine_log_level = nil + @metric_send_delay = ::Naf.configuration.metric_send_delay end def work check_gc_configurations @machine = ::Naf::Machine.find_by_server_address(@server_address) + @metric_sender = ::Logical::Naf::MetricSender.new(@metric_send_delay, @machine) + unless machine.present? - logger.fatal escape_html("This machine is not configued correctly (ipaddress: #{@server_address}).") - logger.fatal escape_html("Please update #{::Naf::Machine.table_name} with an entry for this machine.") + logger.fatal "This machine is not configued correctly (ipaddress: #{@server_address})." + logger.fatal "Please update #{::Naf::Machine.table_name} with an entry for this machine." logger.fatal "Exiting..." exit 1 end machine.lock_for_runner_use do @@ -99,11 +102,11 @@ def remove_invalid_running_jobs logger.debug "looking for invalid running jobs" ::Naf::RunningJob. joins("INNER JOIN #{Naf.schema_name}.historical_jobs AS hj ON hj.id = #{Naf.schema_name}.running_jobs.id"). where('finished_at IS NOT NULL AND hj.started_on_machine_id = ?', @machine.id).readonly(false).each do |job| - logger.debug escape_html("removing invalid job #{job.inspect}") + logger.debug "removing invalid job #{job.inspect}" job.delete end end def check_gc_configurations @@ -131,15 +134,15 @@ machine.machine_runners.each do |runner| runner.machine_runner_invocations.each do |invocation| if invocation.dead_at.blank? begin retval = Process.kill(0, invocation.pid) - logger.detail escape_html("#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down") + logger.detail "#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down" invocation.wind_down_at = Time.zone.now invocation.save! rescue Errno::ESRCH - logger.detail escape_html("ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running") + logger.detail "ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running" invocation.dead_at = Time.zone.now invocation.save! terminate_old_processes(invocation) end end @@ -182,11 +185,11 @@ machine.mark_up # Make sure no processes are thought to be running on this machine terminate_old_processes(machine) if @kill_all_runners - logger.info escape_html("working: #{machine}") + logger.info "working: #{machine}" @children = {} at_exit { ::Af::Application.singleton.emergency_teardown @@ -205,14 +208,14 @@ def work_machine_loop machine.reload # Check machine status if !machine.enabled - logger.warn escape_html("this machine is disabled #{machine}") + logger.warn "this machine is disabled #{machine}" return false elsif machine.marked_down - logger.warn escape_html("this machine is marked down #{machine}") + logger.warn "this machine is marked down #{machine}" return false end logger.debug "marking machine alive" machine.mark_alive @@ -228,56 +231,69 @@ else check_schedules start_new_jobs end + send_metrics + cleanup_dead_children cleanup_old_processes(1.week, 75.minutes) if (Time.zone.now - @last_cleaned_up_processes) > 1.hour return true end + def send_metrics + # Only send metrics if not winding down, or winding down and only runner. + logger.debug "checking whether it's time to send metrics" + @current_invocation.reload + if @current_invocation.wind_down_at.present? + return nil if @machine.machine_runners.running.count > 0 + end + logger.debug "sending metrics" + @metric_sender.send_metrics + end + def check_log_level if machine.log_level != @last_machine_log_level @last_machine_log_level = machine.log_level unless @last_machine_log_level.blank? logging_configurator.parse_and_set_logger_levels(@last_machine_log_level) end end end def check_schedules - logger.debug escape_html("last time schedules were checked: #{::Naf::Machine.last_time_schedules_were_checked}") + logger.debug "last time schedules were checked: #{::Naf::Machine.last_time_schedules_were_checked}" if ::Naf::Machine.is_it_time_to_check_schedules?(@check_schedules_period.minutes) logger.debug "it's time to check schedules" if ::Naf::ApplicationSchedule.try_lock_schedules logger.debug_gross "checking schedules" machine.mark_checked_schedule ::Naf::ApplicationSchedule.unlock_schedules # check scheduled tasks ::Naf::ApplicationSchedule.should_be_queued.each do |application_schedule| - logger.info escape_html("scheduled application: #{application_schedule}") + logger.info "scheduled application: #{application_schedule}" begin naf_boss = ::Logical::Naf::ConstructionZone::Boss.new # this doesn't work very well for run_group_limits in the thousands Range.new(0, application_schedule.application_run_group_quantum || 1, true).each do naf_boss.enqueue_application_schedule(application_schedule) end rescue ::Naf::HistoricalJob::JobPrerequisiteLoop => jpl - logger.error escape_html("#{machine} couldn't queue schedule because of prerequisite loop: #{jpl.message}") + logger.error "#{machine} couldn't queue schedule because of prerequisite loop: #{jpl.message}" logger.warn jpl application_schedule.enabled = false application_schedule.save! - logger.alarm escape_html("Application Schedule disabled due to loop: #{application_schedule}") + logger.alarm "Application Schedule disabled due to loop: #{application_schedule}" end end # check the runner machines ::Naf::Machine.enabled.up.each do |runner_to_check| if runner_to_check.is_stale?(@runner_stale_period.minutes) - logger.alarm escape_html("runner is stale for #{@runner_stale_period} minutes, #{runner_to_check}") + logger.alarm "runner is stale for #{@runner_stale_period} minutes, #{runner_to_check}" runner_to_check.mark_machine_down(machine) end end end end @@ -297,23 +313,23 @@ end rescue Timeout::Error check_dead_children_not_exited_properly break rescue Errno::ECHILD => e - logger.error escape_html("#{machine} No child when we thought we had children #{@children.inspect}") + logger.error "#{machine} No child when we thought we had children #{@children.inspect}" logger.warn e pid = @children.first.try(:first) status = nil - logger.warn escape_html("pulling first child off list to clean it up: pid=#{pid}") + logger.warn "pulling first child off list to clean it up: pid=#{pid}" end if pid begin cleanup_dead_child(pid, status) rescue ActiveRecord::ActiveRecordError => are - logger.error escape_html("Failure during cleaning up of dead child with pid: #{pid}, status: #{status}") - logger.error escape_html("#{are.message}") + logger.error "Failure during cleaning up of dead child with pid: #{pid}, status: #{status}" + logger.error "#{are.message}" rescue StandardError => e # XXX just incase a job control failure -- more code here logger.error "some failure during child clean up" logger.warn e end @@ -336,11 +352,11 @@ dead_children << child end end unless dead_children.blank? - logger.error escape_html("#{machine}: dead children even with timeout during waitpid2(): #{dead_children.inspect}") + logger.error "#{machine}: dead children even with timeout during waitpid2(): #{dead_children.inspect}" logger.warn "this isn't necessarily incorrect -- look for the pids to be cleaned up next round, if not: call it a bug" end end def cleanup_dead_child(pid, status) @@ -349,60 +365,66 @@ if child_job.present? # Update job tags child_job.remove_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:work]]) if status.nil? || status.exited? || status.signaled? - logger.info { escape_html("cleaning up dead child: #{child_job.inspect}") } + logger.info { "cleaning up dead child: #{child_job.inspect}" } finish_job(child_job, { exit_status: (status && status.exitstatus), termination_signal: (status && status.termsig) }) + if status && status.exitstatus > 0 && !child_job.request_to_terminate + @metric_sender.statsd.event("Naf Job Error", + "#{child_job.inspect} finished with non-zero exit status.", + alert_type: "error", + tags: (::Naf.configuration.metric_tags << "naf:joberror")) + end else # this can happen if the child is sigstopped - logger.warn escape_html("child waited for did not exit: #{child_job.inspect}, status: #{status.inspect}") + logger.warn "child waited for did not exit: #{child_job.inspect}, status: #{status.inspect}" end else # XXX ERROR no child for returned pid -- this can't happen - logger.warn escape_html("child pid: #{pid}, status: #{status.inspect}, not managed by this runner") + logger.warn "child pid: #{pid}, status: #{status.inspect}, not managed by this runner" end end def start_new_jobs - logger.detail escape_html("starting new jobs, num children: #{@children.length}/#{machine.thread_pool_size}") + logger.detail "starting new jobs, num children: #{@children.length}/#{machine.thread_pool_size}" while ::Naf::RunningJob.where(started_on_machine_id: machine.id).count < machine.thread_pool_size && memory_available_to_spawn? && current_invocation.wind_down_at.blank? - logger.debug_gross escape_html("fetching jobs because: children: #{@children.length} < #{machine.thread_pool_size} (poolsize)") + logger.debug_gross "fetching jobs because: children: #{@children.length} < #{machine.thread_pool_size} (poolsize)" begin running_job = @job_fetcher.fetch_next_job unless running_job.present? logger.debug_gross "no more jobs to run" break end - logger.info escape_html("starting new job : #{running_job.inspect}") + logger.info "starting new job : #{running_job.inspect}" pid = running_job.historical_job.spawn if pid.present? @children[pid] = running_job running_job.pid = pid running_job.historical_job.pid = pid running_job.historical_job.failed_to_start = false running_job.historical_job.machine_runner_invocation_id = current_invocation.id running_job.save! running_job.historical_job.save! - logger.info escape_html("job started : #{running_job.inspect}") + logger.info "job started : #{running_job.inspect}" else # should never get here (well, hopefully) - logger.error escape_html("#{machine}: failed to execute #{running_job.inspect}") + logger.error "#{machine}: failed to execute #{running_job.inspect}" finish_job(running_job, { failed_to_start: true }) end rescue ActiveRecord::ActiveRecordError => are raise rescue StandardError => e # XXX rescue for various issues - logger.error escape_html("#{machine}: failure during job start") + logger.error "#{machine}: failure during job start" logger.warn e end end logger.debug_gross "done starting jobs" end @@ -474,11 +496,11 @@ return end logger.info "number of old jobs to sift through: #{jobs.length}" jobs.each do |job| - logger.detail escape_html("job still around: #{job.inspect}") + logger.detail "job still around: #{job.inspect}" if job.request_to_terminate == false logger.warn "politely asking process: #{job.pid} to terminate itself" job.request_to_terminate = true job.save! end @@ -498,11 +520,11 @@ if jobs.length == 0 logger.debug_gross "assigned jobs have exited after asking to terminate nicely" return end jobs.each do |job| - logger.warn escape_html("sending SIG_TERM to process: #{job.inspect}") + logger.warn "sending SIG_TERM to process: #{job.inspect}" send_signal_and_maybe_clean_up(job, "TERM") end # wait (1..5).each do |i| @@ -512,11 +534,11 @@ sleep(1) end # kill with fire assigned_jobs(record).each do |job| - logger.alarm escape_html("sending SIG_KILL to process: #{job.inspect}") + logger.alarm "sending SIG_KILL to process: #{job.inspect}" send_signal_and_maybe_clean_up(job, "KILL") # job force job down finish_job(job) end @@ -593,12 +615,7 @@ rescue end sreclaimable end - - def escape_html(str) - CGI::escapeHTML(str) - end - end end