app/models/process/naf/runner.rb in naf-1.1.4 vs app/models/process/naf/runner.rb in naf-2.0.0

- old
+ new

@@ -1,10 +1,13 @@ require 'timeout' module Process::Naf class Runner < ::Af::Application + attr_accessor :machine, + :current_invocation + #---------------- # *** Options *** #+++++++++++++++++ opt :wait_time_for_processes_to_terminate, @@ -39,10 +42,13 @@ "don't modify ruby GC parameters", default: false opt :kill_all_runners, "don't wait for runners to wind down and finish running their jobs", default: false + opt :invocation_uuid, + "unique identifer used for runner logs", + default: `uuidgen` def initialize super opt :log_configuration_files, default: ["af.yml", "af-#{Rails.env}.yml", @@ -54,146 +60,221 @@ "#{af_name}-#{Rails.env}.yml"] @last_machine_log_level = nil end def work - unless @disable_gc_modifications - # These configuration changes will help forked processes, not the runner - ENV['RUBY_HEAP_MIN_SLOTS'] = '500000' - ENV['RUBY_HEAP_SLOTS_INCREMENT'] = '250000' - ENV['RUBY_HEAP_SLOTS_GROWTH_FACTOR'] = '1' - ENV['RUBY_GC_MALLOC_LIMIT'] = '50000000' - end + check_gc_configurations - machine = ::Naf::Machine.find_by_server_address(@server_address) + @machine = ::Naf::Machine.find_by_server_address(@server_address) unless machine.present? logger.fatal "This machine is not configued correctly (ipaddress: #{@server_address})." logger.fatal "Please update #{::Naf::Machine.table_name} with an entry for this machine." logger.fatal "Exiting..." exit 1 end machine.lock_for_runner_use begin - # Wind down other runners - machine.machine_runners.each do |machine_runner| - machine_runner.machine_runner_invocations.each do |invocation| - if invocation.dead_at.blank? - begin - retval = Process.kill(0, invocation.pid) - logger.detail "#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down" - invocation.wind_down_at = Time.zone.now - invocation.save! - rescue Errno::ESRCH - logger.detail "ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running" - invocation.dead_at = Time.zone.now - invocation.save! - end - end - end - end + cleanup_old_processes + wind_down_runners + # Create a machine runner, if it doesn't exist machine_runner = ::Naf::MachineRunner. find_or_create_by_machine_id_and_runner_cwd(machine_id: machine.id, runner_cwd: Dir.pwd) - - begin - repository_name = (`git remote -v`).slice(/:\S+/).sub('.git','')[1..-1] - if repository_name.match(/fatal/) - repository_name = nil - end - rescue - repository_name = nil - end - branch_name = (`git rev-parse --abbrev-ref HEAD`).strip - if branch_name.match(/fatal/) - branch_name = nil - end - commit_information = (`git log --pretty="%H" -n 1`).strip - if commit_information.match(/fatal/) - commit_information = nil - end - deployment_tag = (`git describe --abbrev=0 --tag 2>&1`).strip - if deployment_tag.match(/fatal: No names found, cannot describe anything/) - deployment_tag = nil - end # Create an invocation for this runner - invocation = ::Naf::MachineRunnerInvocation.create!(machine_runner_id: machine_runner.id, - pid: Process.pid, - repository_name: repository_name, - branch_name: branch_name, - commit_information: commit_information, - deployment_tag: deployment_tag) + @current_invocation = ::Naf::MachineRunnerInvocation. + create!({ machine_runner_id: machine_runner.id, + pid: Process.pid, + uuid: @invocation_uuid }.merge!(retrieve_invocation_information)) ensure machine.unlock_for_runner_use end begin - work_machine(machine, invocation) + work_machine ensure - invocation.dead_at = Time.zone.now - invocation.save! + @current_invocation.dead_at = Time.zone.now + @current_invocation.save! + cleanup_old_processes end end - def work_machine(machine, invocation) + def check_gc_configurations + unless @disable_gc_modifications + # These configuration changes will help forked processes, not the runner + ENV['RUBY_HEAP_MIN_SLOTS'] = '500000' + ENV['RUBY_HEAP_SLOTS_INCREMENT'] = '250000' + ENV['RUBY_HEAP_SLOTS_GROWTH_FACTOR'] = '1' + ENV['RUBY_GC_MALLOC_LIMIT'] = '50000000' + end + end + + def cleanup_old_processes + machine.machine_runners.each do |runner| + runner.machine_runner_invocations.recently_marked_dead(24.hours).each do |invocation| + terminate_old_processes(invocation) + end + end + end + + def wind_down_runners + machine.machine_runners.each do |runner| + runner.machine_runner_invocations.each do |invocation| + if invocation.dead_at.blank? + begin + retval = Process.kill(0, invocation.pid) + logger.detail "#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down" + invocation.wind_down_at = Time.zone.now + invocation.save! + rescue Errno::ESRCH + logger.detail "ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running" + invocation.dead_at = Time.zone.now + invocation.save! + terminate_old_processes(invocation) + end + end + end + end + end + + def retrieve_invocation_information + begin + repository_name = (`git remote -v`).slice(/:\S+/).sub('.git','')[1..-1] + if repository_name.match(/fatal/) + repository_name = nil + end + rescue + repository_name = nil + end + branch_name = (`git rev-parse --abbrev-ref HEAD`).strip + if branch_name.match(/fatal/) + branch_name = nil + end + commit_information = (`git log --pretty="%H" -n 1`).strip + if commit_information.match(/fatal/) + commit_information = nil + end + deployment_tag = (`git describe --abbrev=0 --tag 2>&1`).strip + if deployment_tag.match(/fatal: No names found, cannot describe anything/) + deployment_tag = nil + end + + { + repository_name: repository_name, + branch_name: branch_name, + commit_information: commit_information, + deployment_tag: deployment_tag + } + end + + def work_machine machine.mark_alive machine.mark_up # Make sure no processes are thought to be running on this machine terminate_old_processes(machine) if @kill_all_runners - logger.info "working: #{machine}" + logger.info escape_html("working: #{machine}") @children = {} + @threads = {} at_exit { ::Af::Application.singleton.emergency_teardown } @job_fetcher = ::Logical::Naf::JobFetcher.new(machine) while true - break unless work_machine_loop(machine, invocation) + break unless work_machine_loop GC.start end logger.info "runner quitting" end - def work_machine_loop(machine, invocation) + def work_machine_loop machine.reload # Check machine status if !machine.enabled - logger.warn "this machine is disabled #{machine}" + logger.warn escape_html("this machine is disabled #{machine}") return false elsif machine.marked_down - logger.warn "this machine is marked down #{machine}" + logger.warn escape_html("this machine is marked down #{machine}") return false end machine.mark_alive + check_log_level + + @current_invocation.reload + if current_invocation.wind_down_at.present? + logger.warn "invocation asked to wind down" + if @children.length == 0 + return false; + end + else + check_schedules + start_new_jobs + end + + cleanup_dead_children + + return true + end + + def check_log_level if machine.log_level != @last_machine_log_level @last_machine_log_level = machine.log_level unless @last_machine_log_level.blank? logging_configurator.parse_and_set_logger_levels(@last_machine_log_level) end end + end - invocation.reload - if invocation.wind_down_at.present? - logger.warn "invocation asked to wind down" - if @children.length == 0 - return false; + def check_schedules + if ::Naf::Machine.is_it_time_to_check_schedules?(@check_schedules_period.minutes) + logger.debug "it's time to check schedules" + if ::Naf::ApplicationSchedule.try_lock_schedules + logger.debug_gross "checking schedules" + machine.mark_checked_schedule + ::Naf::ApplicationSchedule.unlock_schedules + + # check scheduled tasks + should_be_queued.each do |application_schedule| + logger.info escape_html("scheduled application: #{application_schedule}") + begin + naf_boss = ::Logical::Naf::ConstructionZone::Boss.new + # this doesn't work very well for run_group_limits in the thousands + Range.new(0, application_schedule.application_run_group_limit || 1, true).each do + naf_boss.enqueue_application_schedule(application_schedule) + end + rescue ::Naf::HistoricalJob::JobPrerequisiteLoop => jpl + logger.error escape_html("#{machine} couldn't queue schedule because of prerequisite loop: #{jpl.message}") + logger.warn jpl + application_schedule.enabled = false + application_schedule.save! + logger.alarm escape_html("Application Schedule disabled due to loop: #{application_schedule}") + end + end + + # check the runner machines + ::Naf::Machine.enabled.up.each do |runner_to_check| + if runner_to_check.is_stale?(@runner_stale_period.minutes) + logger.alarm escape_html("runner is stale for #{@runner_stale_period} minutes, #{runner_to_check}") + runner_to_check.mark_machine_down(machine) + end + end end end + end - check_schedules(machine) if invocation.wind_down_at.blank? - + def cleanup_dead_children # clean up children that have exited logger.detail "cleaning up dead children: #{@children.length}" if @children.length > 0 while @children.length > 0 @@ -202,58 +283,26 @@ begin Timeout::timeout(@loop_sleep_time) do pid, status = Process.waitpid2(-1) end rescue Timeout::Error - # XXX is there a race condition where a child process exits - # XXX has not set pid or status yet and timeout fires? - # XXX i bet there is - # XXX so this code is here: - dead_children = [] - @children.each do |pid, child| - unless is_job_process_alive?(child) - dead_children << child - end - end - - unless dead_children.blank? - logger.error "#{machine}: dead children even with timeout during waitpid2(): #{dead_children.inspect}" - logger.warn "this isn't necessarily incorrect -- look for the pids to be cleaned up next round, if not: call it a bug" - end - + check_dead_children_not_exited_properly break rescue Errno::ECHILD => e - logger.error "#{machine} No child when we thought we had children #{@children.inspect}" + logger.error escape_html("#{machine} No child when we thought we had children #{@children.inspect}") logger.warn e pid = @children.first.try(:first) status = nil logger.warn "pulling first child off list to clean it up: pid=#{pid}" end if pid begin - child_job = @children.delete(pid) - - if child_job.present? - # Update job tags - child_job.historical_job.remove_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:work]]) - - if status.nil? || status.exited? || status.signaled? - logger.info { "cleaning up dead child: #{child_job.reload}" } - finish_job(child_job, - { exit_status: (status && status.exitstatus), termination_signal: (status && status.termsig) }) - else - # this can happen if the child is sigstopped - logger.warn "child waited for did not exit: #{child_job}, status: #{status.inspect}" - end - else - # XXX ERROR no child for returned pid -- this can't happen - logger.warn "child pid: #{pid}, status: #{status.inspect}, not managed by this runner" - end + cleanup_dead_child(pid, status) rescue ActiveRecord::ActiveRecordError => are - logger.error "Failure during cleaning up of dead child with pid: #{pid}" - logger.error "#{are.message}" + logger.error escape_html("Failure during cleaning up of dead child with pid: #{pid}, status: #{status}") + logger.error escape_html("#{are.message}") rescue StandardError => e # XXX just incase a job control failure -- more code here logger.error "some failure during child clean up" logger.warn e end @@ -261,93 +310,158 @@ end else logger.detail "sleeping in loop: #{@loop_sleep_time} seconds" sleep(@loop_sleep_time) end + end + # XXX is there a race condition where a child process exits + # XXX has not set pid or status yet and timeout fires? + # XXX i bet there is + # XXX so this code is here: + def check_dead_children_not_exited_properly + dead_children = [] + @children.each do |pid, child| + unless is_job_process_alive?(child.reload) + dead_children << child + end + end + + unless dead_children.blank? + logger.error escape_html("#{machine}: dead children even with timeout during waitpid2(): #{dead_children.inspect}") + logger.warn "this isn't necessarily incorrect -- look for the pids to be cleaned up next round, if not: call it a bug" + end + end + + def cleanup_dead_child(pid, status) + child_job = @children.delete(pid) + + if child_job.present? + # Update job tags + child_job.remove_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:work]]) + + if status.nil? || status.exited? || status.signaled? + logger.info { escape_html("cleaning up dead child: #{child_job.reload}") } + finish_job(child_job, + { exit_status: (status && status.exitstatus), termination_signal: (status && status.termsig) }) + + thread = @threads.delete(pid) + logger.detail escape_html("cleaning up threads: #{thread.inspect}") + logger.detail escape_html("thread list: #{Thread.list}") + thread.join + else + # this can happen if the child is sigstopped + logger.warn escape_html("child waited for did not exit: #{child_job}, status: #{status.inspect}") + end + else + # XXX ERROR no child for returned pid -- this can't happen + logger.warn "child pid: #{pid}, status: #{status.inspect}, not managed by this runner" + end + end + + def start_new_jobs # start new jobs logger.detail "starting new jobs, num children: #{@children.length}/#{machine.thread_pool_size}" - # XXX while @children.length < machine.thread_pool_size && memory_available_to_spawn? && invocation.wind_down_at.blank? - while ::Naf::RunningJob.where(:started_on_machine_id => machine.id).count < machine.thread_pool_size && - memory_available_to_spawn? && invocation.wind_down_at.blank? + # XXX while @children.length < machine.thread_pool_size && memory_available_to_spawn? && current_invocation.wind_down_at.blank? + while ::Naf::RunningJob.where(started_on_machine_id: machine.id).count < machine.thread_pool_size && + memory_available_to_spawn? && current_invocation.wind_down_at.blank? logger.debug_gross "fetching jobs because: children: #{@children.length} < #{machine.thread_pool_size} (poolsize)" begin running_job = @job_fetcher.fetch_next_job unless running_job.present? logger.debug_gross "no more jobs to run" break end - logger.info "starting new job : #{running_job}" + logger.info escape_html("starting new job : #{running_job.inspect}") - pid = running_job.historical_job.spawn + # fork and run + pid, stdin, stdout, stderr = running_job.historical_job.spawn + stdin.close + + # Reset NAF_JOB_ID + ENV.delete('NAF_JOB_ID') if pid @children[pid] = running_job running_job.pid = pid running_job.historical_job.pid = pid running_job.historical_job.failed_to_start = false - running_job.historical_job.machine_runner_invocation_id = invocation.id - logger.info "job started : #{running_job}" + running_job.historical_job.machine_runner_invocation_id = current_invocation.id + logger.info escape_html("job started : #{running_job}") running_job.save! running_job.historical_job.save! + + # Spawn a thread to output the log of each job to files. + # + # Make sure not to execute any database calls inside this + # block, as it will start an ActiveRecord connection for each + # thread and eventually raise a ConnetionTimeoutError, resulting + # the runner to exit. + thread = Thread.new do + log_output_until_job_finishes(running_job.id, stdout, stderr) + end + @threads[pid] = thread else # should never get here (well, hopefully) - logger.error "#{machine}: failed to execute #{running_job}" + logger.error escape_html("#{machine}: failed to execute #{running_job}") finish_job(running_job, { failed_to_start: true }) end rescue ActiveRecord::ActiveRecordError => are raise rescue StandardError => e # XXX rescue for various issues - logger.error "#{machine}: failure during job start" + logger.error escape_html("#{machine}: failure during job start") logger.warn e end end logger.debug_gross "done starting jobs" + end - return true + def log_output_until_job_finishes(job_id, stdout, stderr) + log_file = ::Logical::Naf::LogFile.new("#{::Naf::PREFIX_PATH}/#{::Naf.schema_name}/jobs/#{job_id}") + log_file.open - end + # Continue reading logs from stdout/stderror until it reaches end of file + while true + read_pipes = [] + read_pipes << stdout if stdout + read_pipes << stderr if stderr + return if (read_pipes.length == 0) - def check_schedules(machine) - if ::Naf::Machine.is_it_time_to_check_schedules?(@check_schedules_period.minutes) - logger.debug "it's time to check schedules" - if ::Naf::ApplicationSchedule.try_lock_schedules - logger.debug_gross "checking schedules" - machine.mark_checked_schedule - ::Naf::ApplicationSchedule.unlock_schedules + error_pipes = read_pipes.clone + read_array, write_array, error_array = Kernel.select(read_pipes, nil, error_pipes, 1) - # check scheduled tasks - should_be_queued(machine).each do |application_schedule| - logger.info "scheduled application: #{application_schedule}" - begin - naf_boss = ::Logical::Naf::ConstructionZone::Boss.new - # this doesn't work very well for run_group_limits in the thousands - Range.new(0, application_schedule.application_run_group_limit || 1, true).each do - naf_boss.enqueue_application_schedule(application_schedule) + unless error_array.blank? + logger.error escape_html("job(#{job_id}): select returned error for #{error_pipes.inspect} (read_pipes: #{read_pipes.inspect})") + # XXX we should probably close the errored FDs + end + + unless read_array.blank? + begin + for r in read_array do + begin + # Parse each log line into JSON + r.read_nonblock(10240).split("\n").each do |log| + log_file << log.rstrip + end + rescue Errno::EAGAIN + rescue Errno::EINTR + rescue EOFError => eof + stdout = nil if r == stdout + stderr = nil if r == stderr end - rescue ::Naf::HistoricalJob::JobPrerequisiteLoop => jpl - logger.error "#{machine} couldn't queue schedule because of prerequisite loop: #{jpl.message}" - logger.warn jpl - application_schedule.enabled = false - application_schedule.save! - logger.alarm "Application Schedule disabled due to loop: #{application_schedule}" end + ensure + log_file.write end - - # check the runner machines - ::Naf::Machine.enabled.up.each do |runner_to_check| - if runner_to_check.is_stale?(@runner_stale_period.minutes) - logger.alarm "runner is stale for #{@runner_stale_period} minutes, #{runner_to_check}" - runner_to_check.mark_machine_down(machine) - end - end end end + + log_file.close end # XXX update_all doesn't support "from_partition" so we have this helper def update_historical_job(updates, historical_job_id) updates[:updated_at] = Time.zone.now @@ -362,19 +476,19 @@ SQL ::Naf::HistoricalJob.find_by_sql([update_sql] + updates.values + [historical_job_id]) end def finish_job(running_job, updates = {}) - running_job.historical_job.remove_all_tags - running_job.historical_job.add_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:cleanup]]) + if running_job.present? + running_job.remove_all_tags + running_job.add_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:cleanup]]) + end ::Naf::HistoricalJob.transaction do update_historical_job(updates.merge({ finished_at: Time.zone.now }), running_job.id) running_job.delete end - - running_job.historical_job.remove_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:cleanup]]) end # kill(0, pid) seems to fail during at_exit block # so this shoots from the hip def emergency_teardown @@ -390,59 +504,59 @@ # force job down finish_job(child) end end - def terminate_old_processes(machine) + def terminate_old_processes(record) # check if any processes are hanging around and ask them # politely if they will please terminate - jobs = assigned_jobs(machine) + jobs = assigned_jobs(record) if jobs.length == 0 logger.detail "no jobs to remove" return end logger.info "number of old jobs to sift through: #{jobs.length}" jobs.each do |job| - logger.detail "job still around: #{job}" + logger.detail escape_html("job still around: #{job}") if job.request_to_terminate == false logger.warn "politely asking process: #{job.pid} to terminate itself" job.request_to_terminate = true job.save! end end # wait (1..@wait_time_for_processes_to_terminate).each do |i| - num_assigned_jobs = assigned_jobs(machine).length + num_assigned_jobs = assigned_jobs(record).length return if num_assigned_jobs == 0 logger.debug_medium "#{i}/#{@wait_time_for_processes_to_terminate}: sleeping 1 second while we wait for " + "#{num_assigned_jobs} assigned job(s) to terminate as requested" sleep(1) end # nudge them to terminate - jobs = assigned_jobs(machine) + jobs = assigned_jobs(record) if jobs.length == 0 logger.debug_gross "assigned jobs have exited after asking to terminate nicely" return end jobs.each do |job| - logger.warn "sending SIG_TERM to process: #{job}" + logger.warn escape_html("sending SIG_TERM to process: #{job}") send_signal_and_maybe_clean_up(job, "TERM") end # wait (1..5).each do |i| - num_assigned_jobs = assigned_jobs(machine).length + num_assigned_jobs = assigned_jobs(record).length return if num_assigned_jobs == 0 logger.debug_medium "#{i}/5: sleeping 1 second while we wait for #{num_assigned_jobs} assigned job(s) to terminate from SIG_TERM" sleep(1) end # kill with fire - assigned_jobs(machine).each do |job| - logger.alarm "sending SIG_KILL to process: #{job}" + assigned_jobs(record).each do |job| + logger.alarm escape_html("sending SIG_KILL to process: #{job}") send_signal_and_maybe_clean_up(job, "KILL") # job force job down finish_job(job) end @@ -471,17 +585,23 @@ def is_job_process_alive?(job) return send_signal_and_maybe_clean_up(job, 0) end - def assigned_jobs(machine) - return ::Naf::RunningJob.assigned_jobs(machine).select do |job| - is_job_process_alive?(job) + def assigned_jobs(record) + if record.kind_of? ::Naf::MachineRunnerInvocation + return ::Naf::RunningJob.started_on_invocation(record.id).select do |job| + is_job_process_alive?(job) + end + else + return ::Naf::RunningJob.assigned_jobs(record).select do |job| + is_job_process_alive?(job) + end end end - def should_be_queued(machine) + def should_be_queued not_finished_applications = ::Naf::HistoricalJob. queued_between(Time.zone.now - Naf::HistoricalJob::JOB_STALE_TIME, Time.zone.now). where("finished_at IS NULL AND request_to_terminate = false"). find_all{ |job| job.application_id.present? }. index_by{ |job| job.application_id } @@ -532,9 +652,13 @@ return true end logger.alarm "#{Facter.hostname}.#{Facter.domain}: not enough memory to spawn: #{memory_free_percentage}% (free) < #{@minimum_memory_free}% (min percent)" return false + end + + def escape_html(str) + CGI::escapeHTML(str) end end end