app/models/process/naf/runner.rb in naf-2.1.10 vs app/models/process/naf/runner.rb in naf-2.1.11

- old
+ new

@@ -2,11 +2,12 @@ module Process::Naf class Runner < ::Af::Application attr_accessor :machine, - :current_invocation + :current_invocation, + :last_cleaned_up_processes #---------------- # *** Options *** #+++++++++++++++++ @@ -62,18 +63,17 @@ check_gc_configurations @machine = ::Naf::Machine.find_by_server_address(@server_address) unless machine.present? - logger.fatal "This machine is not configued correctly (ipaddress: #{@server_address})." - logger.fatal "Please update #{::Naf::Machine.table_name} with an entry for this machine." + logger.fatal escape_html("This machine is not configued correctly (ipaddress: #{@server_address}).") + logger.fatal escape_html("Please update #{::Naf::Machine.table_name} with an entry for this machine.") logger.fatal "Exiting..." exit 1 end - machine.lock_for_runner_use - begin + machine.lock_for_runner_use do cleanup_old_processes remove_invalid_running_jobs wind_down_runners # Create a machine runner, if it doesn't exist @@ -83,12 +83,10 @@ # Create an invocation for this runner @current_invocation = ::Naf::MachineRunnerInvocation. create!({ machine_runner_id: machine_runner.id, pid: Process.pid, uuid: @invocation_uuid }.merge!(retrieve_invocation_information)) - ensure - machine.unlock_for_runner_use end begin work_machine ensure @@ -97,30 +95,35 @@ cleanup_old_processes end end def remove_invalid_running_jobs + logger.debug "looking for invalid running jobs" ::Naf::RunningJob. joins("INNER JOIN #{Naf.schema_name}.historical_jobs AS hj ON hj.id = #{Naf.schema_name}.running_jobs.id"). where('finished_at IS NOT NULL AND hj.started_on_machine_id = ?', @machine.id).readonly(false).each do |job| + logger.debug escape_html("removing invalid job #{job.inspect}") job.delete end end def check_gc_configurations + logger.debug "checking garbage collection configurations" unless @disable_gc_modifications # These configuration changes will help forked processes, not the runner ENV['RUBY_HEAP_MIN_SLOTS'] = '500000' ENV['RUBY_HEAP_SLOTS_INCREMENT'] = '250000' ENV['RUBY_HEAP_SLOTS_GROWTH_FACTOR'] = '1' ENV['RUBY_GC_MALLOC_LIMIT'] = '50000000' end end - def cleanup_old_processes - machine.machine_runners.each do |runner| - runner.machine_runner_invocations.recently_marked_dead(24.hours).each do |invocation| + def cleanup_old_processes(created_at_interval = 1.month, marked_dead_interval = 24.hours) + @last_cleaned_up_processes = Time.zone.now + logger.debug "cleaning up old processes" + ::Naf::MachineRunner.where("created_at >= ?", Time.zone.now - created_at_interval).each do |runner| + runner.machine_runner_invocations.recently_marked_dead(marked_dead_interval).each do |invocation| terminate_old_processes(invocation) end end end @@ -128,15 +131,15 @@ machine.machine_runners.each do |runner| runner.machine_runner_invocations.each do |invocation| if invocation.dead_at.blank? begin retval = Process.kill(0, invocation.pid) - logger.detail "#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down" + logger.detail escape_html("#{retval} = kill(0, #{invocation.pid}) -- process alive, marking runner invocation as winding down") invocation.wind_down_at = Time.zone.now invocation.save! rescue Errno::ESRCH - logger.detail "ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running" + logger.detail escape_html("ESRCH = kill(0, #{invocation.pid}) -- marking runner invocation as not running") invocation.dead_at = Time.zone.now invocation.save! terminate_old_processes(invocation) end end @@ -209,10 +212,11 @@ elsif machine.marked_down logger.warn escape_html("this machine is marked down #{machine}") return false end + logger.debug "marking machine alive" machine.mark_alive check_log_level @current_invocation.reload @@ -225,10 +229,11 @@ check_schedules start_new_jobs end cleanup_dead_children + cleanup_old_processes(1.week, 75.minutes) if (Time.zone.now - @last_cleaned_up_processes) > 1.hour return true end def check_log_level @@ -296,11 +301,11 @@ rescue Errno::ECHILD => e logger.error escape_html("#{machine} No child when we thought we had children #{@children.inspect}") logger.warn e pid = @children.first.try(:first) status = nil - logger.warn "pulling first child off list to clean it up: pid=#{pid}" + logger.warn escape_html("pulling first child off list to clean it up: pid=#{pid}") end if pid begin cleanup_dead_child(pid, status) @@ -325,11 +330,11 @@ # XXX i bet there is # XXX so this code is here: def check_dead_children_not_exited_properly dead_children = [] @children.each do |pid, child| - unless is_job_process_alive?(child.reload) + unless is_job_process_alive?(child) dead_children << child end end unless dead_children.blank? @@ -344,31 +349,29 @@ if child_job.present? # Update job tags child_job.remove_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:work]]) if status.nil? || status.exited? || status.signaled? - logger.info { escape_html("cleaning up dead child: #{child_job.reload}") } + logger.info { escape_html("cleaning up dead child: #{child_job.inspect}") } finish_job(child_job, { exit_status: (status && status.exitstatus), termination_signal: (status && status.termsig) }) else # this can happen if the child is sigstopped - logger.warn escape_html("child waited for did not exit: #{child_job}, status: #{status.inspect}") + logger.warn escape_html("child waited for did not exit: #{child_job.inspect}, status: #{status.inspect}") end else # XXX ERROR no child for returned pid -- this can't happen - logger.warn "child pid: #{pid}, status: #{status.inspect}, not managed by this runner" + logger.warn escape_html("child pid: #{pid}, status: #{status.inspect}, not managed by this runner") end end def start_new_jobs - # start new jobs - logger.detail "starting new jobs, num children: #{@children.length}/#{machine.thread_pool_size}" - # XXX while @children.length < machine.thread_pool_size && memory_available_to_spawn? && current_invocation.wind_down_at.blank? + logger.detail escape_html("starting new jobs, num children: #{@children.length}/#{machine.thread_pool_size}") while ::Naf::RunningJob.where(started_on_machine_id: machine.id).count < machine.thread_pool_size && memory_available_to_spawn? && current_invocation.wind_down_at.blank? - logger.debug_gross "fetching jobs because: children: #{@children.length} < #{machine.thread_pool_size} (poolsize)" + logger.debug_gross escape_html("fetching jobs because: children: #{@children.length} < #{machine.thread_pool_size} (poolsize)") begin running_job = @job_fetcher.fetch_next_job unless running_job.present? logger.debug_gross "no more jobs to run" @@ -382,16 +385,16 @@ @children[pid] = running_job running_job.pid = pid running_job.historical_job.pid = pid running_job.historical_job.failed_to_start = false running_job.historical_job.machine_runner_invocation_id = current_invocation.id - logger.info escape_html("job started : #{running_job}") running_job.save! running_job.historical_job.save! + logger.info escape_html("job started : #{running_job.inspect}") else # should never get here (well, hopefully) - logger.error escape_html("#{machine}: failed to execute #{running_job}") + logger.error escape_html("#{machine}: failed to execute #{running_job.inspect}") finish_job(running_job, { failed_to_start: true }) end rescue ActiveRecord::ActiveRecordError => are raise @@ -402,11 +405,11 @@ end end logger.debug_gross "done starting jobs" end - # XXX update_all doesn't support "from_partition" so we have this helper + # update_all doesn't support "from_partition" so we have this helper def update_historical_job(updates, historical_job_id) updates[:updated_at] = Time.zone.now update_columns = updates.map{ |k,v| "#{k} = ?" }.join(", ") update_sql = <<-SQL UPDATE @@ -418,30 +421,44 @@ SQL ::Naf::HistoricalJob.find_by_sql([update_sql] + updates.values + [historical_job_id]) end def finish_job(running_job, updates = {}) - if running_job.present? - running_job.remove_all_tags - running_job.add_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:cleanup]]) + # Check to see if running job still exists + job = ::Naf::RunningJob.find_by_id(running_job.id) + if job.present? + job.lock_for_runner_use do + ::Naf::HistoricalJob.transaction do + update_historical_job(updates.merge({ finished_at: Time.zone.now }), job.id) + job.delete + end + end + else + job = ::Naf::HistoricalJob.find_by_id(running_job.id) + # This does not seem to be need, but just for extra measure + if job.present? + job.lock_for_runner_use do + ::Naf::HistoricalJob.transaction do + update_historical_job(updates.merge({ finished_at: Time.zone.now }), job.id) + end + end + end end - - ::Naf::HistoricalJob.transaction do - update_historical_job(updates.merge({ finished_at: Time.zone.now }), running_job.id) - running_job.delete - end end # kill(0, pid) seems to fail during at_exit block # so this shoots from the hip def emergency_teardown return if @children.length == 0 logger.warn "emergency teardown of #{@children.length} job(s)" @children.clone.each do |pid, child| send_signal_and_maybe_clean_up(child, "TERM") end + + # Wait 2 seconds sleep(2) + @children.clone.each do |pid, child| send_signal_and_maybe_clean_up(child, "KILL") # force job down finish_job(child) @@ -454,13 +471,14 @@ jobs = assigned_jobs(record) if jobs.length == 0 logger.detail "no jobs to remove" return end + logger.info "number of old jobs to sift through: #{jobs.length}" jobs.each do |job| - logger.detail escape_html("job still around: #{job}") + logger.detail escape_html("job still around: #{job.inspect}") if job.request_to_terminate == false logger.warn "politely asking process: #{job.pid} to terminate itself" job.request_to_terminate = true job.save! end @@ -480,11 +498,11 @@ if jobs.length == 0 logger.debug_gross "assigned jobs have exited after asking to terminate nicely" return end jobs.each do |job| - logger.warn escape_html("sending SIG_TERM to process: #{job}") + logger.warn escape_html("sending SIG_TERM to process: #{job.inspect}") send_signal_and_maybe_clean_up(job, "TERM") end # wait (1..5).each do |i| @@ -494,11 +512,11 @@ sleep(1) end # kill with fire assigned_jobs(record).each do |job| - logger.alarm escape_html("sending SIG_KILL to process: #{job}") + logger.alarm escape_html("sending SIG_KILL to process: #{job.inspect}") send_signal_and_maybe_clean_up(job, "KILL") # job force job down finish_job(job) end @@ -520,10 +538,11 @@ # job does not exist -- mark it finished finish_job(job) return false end + return true end def is_job_process_alive?(job) return send_signal_and_maybe_clean_up(job, 0) @@ -543,13 +562,26 @@ def memory_available_to_spawn? Facter.clear memory_size = Facter.memorysize_mb.to_f memory_free = Facter.memoryfree_mb.to_f + memory_free_percentage = ((memory_free + sreclaimable_memory) / memory_size) * 100.0 - # Linux breaks out kernel cache-use memory into an SReclaimable stat - # in /proc/meminfo which should be counted as free, but facter does not. + if (memory_free_percentage >= @minimum_memory_free) + logger.detail "memory available: #{memory_free_percentage}% (free) >= " + + "#{@minimum_memory_free}% (min percent)" + return true + end + logger.alarm "#{Facter.hostname}.#{Facter.domain}: not enough memory to spawn: " + + "#{memory_free_percentage}% (free) < #{@minimum_memory_free}% (min percent)" + + return false + end + + # Linux breaks out kernel cache-use memory into an SReclaimable stat + # in /proc/meminfo which should be counted as free, but facter does not. + def sreclaimable_memory sreclaimable = 0.0 begin File.readlines('/proc/meminfo').each do |l| if l =~ /^(?:SReclaimable):\s+(\d+)\s+\S+/ # Convert the memory from Kilobytes to Gigabytes and @@ -559,20 +591,10 @@ end end rescue end - memory_free_percentage = ((memory_free + sreclaimable) / memory_size) * 100.0 - - if (memory_free_percentage >= @minimum_memory_free) - logger.detail "memory available: #{memory_free_percentage}% (free) >= " + - "#{@minimum_memory_free}% (min percent)" - return true - end - logger.alarm "#{Facter.hostname}.#{Facter.domain}: not enough memory to spawn: " + - "#{memory_free_percentage}% (free) < #{@minimum_memory_free}% (min percent)" - - return false + sreclaimable end def escape_html(str) CGI::escapeHTML(str) end