app/models/process/naf/runner.rb in naf-2.1.3 vs app/models/process/naf/runner.rb in naf-2.1.4
- old
+ new
@@ -182,11 +182,10 @@
terminate_old_processes(machine) if @kill_all_runners
logger.info escape_html("working: #{machine}")
@children = {}
- @threads = {}
at_exit {
::Af::Application.singleton.emergency_teardown
}
@@ -347,15 +346,10 @@
if status.nil? || status.exited? || status.signaled?
logger.info { escape_html("cleaning up dead child: #{child_job.reload}") }
finish_job(child_job,
{ exit_status: (status && status.exitstatus), termination_signal: (status && status.termsig) })
-
- thread = @threads.delete(pid)
- logger.detail escape_html("cleaning up threads: #{thread.inspect}")
- logger.detail escape_html("thread list: #{Thread.list}")
- thread.join
else
# this can happen if the child is sigstopped
logger.warn escape_html("child waited for did not exit: #{child_job}, status: #{status.inspect}")
end
else
@@ -380,36 +374,20 @@
break
end
logger.info escape_html("starting new job : #{running_job.inspect}")
- # fork and run
- pid, stdin, stdout, stderr = running_job.historical_job.spawn
- stdin.close
-
- # Reset NAF_JOB_ID
- ENV.delete('NAF_JOB_ID')
- if pid
+ pid = running_job.historical_job.spawn
+ if pid.present?
@children[pid] = running_job
running_job.pid = pid
running_job.historical_job.pid = pid
running_job.historical_job.failed_to_start = false
running_job.historical_job.machine_runner_invocation_id = current_invocation.id
logger.info escape_html("job started : #{running_job}")
running_job.save!
running_job.historical_job.save!
-
- # Spawn a thread to output the log of each job to files.
- #
- # Make sure not to execute any database calls inside this
- # block, as it will start an ActiveRecord connection for each
- # thread and eventually raise a ConnetionTimeoutError, resulting
- # the runner to exit.
- thread = Thread.new do
- log_output_until_job_finishes(running_job.id, stdout, stderr)
- end
- @threads[pid] = thread
else
# should never get here (well, hopefully)
logger.error escape_html("#{machine}: failed to execute #{running_job}")
finish_job(running_job, { failed_to_start: true })
@@ -421,58 +399,9 @@
logger.error escape_html("#{machine}: failure during job start")
logger.warn e
end
end
logger.debug_gross "done starting jobs"
- end
-
- def log_output_until_job_finishes(job_id, stdout, stderr)
- log_file = ::Logical::Naf::LogFile.new("#{::Naf::PREFIX_PATH}/#{::Naf.schema_name}/jobs/#{job_id}")
- log_file.open
-
- # Continue reading logs from stdout/stderror until it reaches end of file
- while true
- read_pipes = []
- read_pipes << stdout if stdout
- read_pipes << stderr if stderr
- return if (read_pipes.length == 0)
-
- error_pipes = read_pipes.clone
- read_array, write_array, error_array = Kernel.select(read_pipes, nil, error_pipes, 1)
-
- unless error_array.blank?
- logger.error escape_html("job(#{job_id}): select returned error for #{error_pipes.inspect} (read_pipes: #{read_pipes.inspect})")
- # XXX we should probably close the errored FDs
- end
-
- unless read_array.blank?
- begin
- for r in read_array do
- begin
- # Parse each log line into JSON
- r.read_nonblock(10240).split("\n").each do |log|
- log_file << log.rstrip
- end
- rescue Errno::EAGAIN => e
- logger.error 'EAGAIN'
- logger.error "#{e.inspect}"
- logger.error "#{e.message}"
- rescue Errno::EWOULDBLOCK => ew
- logger.error 'EWOULDBLOCK'
- logger.error "#{ew.inspect}"
- logger.error "#{ew.message}"
- rescue EOFError => eof
- stdout = nil if r == stdout
- stderr = nil if r == stderr
- end
- end
- ensure
- log_file.write
- end
- end
- end
-
- log_file.close
end
# XXX update_all doesn't support "from_partition" so we have this helper
def update_historical_job(updates, historical_job_id)
updates[:updated_at] = Time.zone.now