app/models/process/naf/runner.rb in naf-2.1.3 vs app/models/process/naf/runner.rb in naf-2.1.4

- old
+ new

@@ -182,11 +182,10 @@ terminate_old_processes(machine) if @kill_all_runners logger.info escape_html("working: #{machine}") @children = {} - @threads = {} at_exit { ::Af::Application.singleton.emergency_teardown } @@ -347,15 +346,10 @@ if status.nil? || status.exited? || status.signaled? logger.info { escape_html("cleaning up dead child: #{child_job.reload}") } finish_job(child_job, { exit_status: (status && status.exitstatus), termination_signal: (status && status.termsig) }) - - thread = @threads.delete(pid) - logger.detail escape_html("cleaning up threads: #{thread.inspect}") - logger.detail escape_html("thread list: #{Thread.list}") - thread.join else # this can happen if the child is sigstopped logger.warn escape_html("child waited for did not exit: #{child_job}, status: #{status.inspect}") end else @@ -380,36 +374,20 @@ break end logger.info escape_html("starting new job : #{running_job.inspect}") - # fork and run - pid, stdin, stdout, stderr = running_job.historical_job.spawn - stdin.close - - # Reset NAF_JOB_ID - ENV.delete('NAF_JOB_ID') - if pid + pid = running_job.historical_job.spawn + if pid.present? @children[pid] = running_job running_job.pid = pid running_job.historical_job.pid = pid running_job.historical_job.failed_to_start = false running_job.historical_job.machine_runner_invocation_id = current_invocation.id logger.info escape_html("job started : #{running_job}") running_job.save! running_job.historical_job.save! - - # Spawn a thread to output the log of each job to files. - # - # Make sure not to execute any database calls inside this - # block, as it will start an ActiveRecord connection for each - # thread and eventually raise a ConnetionTimeoutError, resulting - # the runner to exit. - thread = Thread.new do - log_output_until_job_finishes(running_job.id, stdout, stderr) - end - @threads[pid] = thread else # should never get here (well, hopefully) logger.error escape_html("#{machine}: failed to execute #{running_job}") finish_job(running_job, { failed_to_start: true }) @@ -421,58 +399,9 @@ logger.error escape_html("#{machine}: failure during job start") logger.warn e end end logger.debug_gross "done starting jobs" - end - - def log_output_until_job_finishes(job_id, stdout, stderr) - log_file = ::Logical::Naf::LogFile.new("#{::Naf::PREFIX_PATH}/#{::Naf.schema_name}/jobs/#{job_id}") - log_file.open - - # Continue reading logs from stdout/stderror until it reaches end of file - while true - read_pipes = [] - read_pipes << stdout if stdout - read_pipes << stderr if stderr - return if (read_pipes.length == 0) - - error_pipes = read_pipes.clone - read_array, write_array, error_array = Kernel.select(read_pipes, nil, error_pipes, 1) - - unless error_array.blank? - logger.error escape_html("job(#{job_id}): select returned error for #{error_pipes.inspect} (read_pipes: #{read_pipes.inspect})") - # XXX we should probably close the errored FDs - end - - unless read_array.blank? - begin - for r in read_array do - begin - # Parse each log line into JSON - r.read_nonblock(10240).split("\n").each do |log| - log_file << log.rstrip - end - rescue Errno::EAGAIN => e - logger.error 'EAGAIN' - logger.error "#{e.inspect}" - logger.error "#{e.message}" - rescue Errno::EWOULDBLOCK => ew - logger.error 'EWOULDBLOCK' - logger.error "#{ew.inspect}" - logger.error "#{ew.message}" - rescue EOFError => eof - stdout = nil if r == stdout - stderr = nil if r == stderr - end - end - ensure - log_file.write - end - end - end - - log_file.close end # XXX update_all doesn't support "from_partition" so we have this helper def update_historical_job(updates, historical_job_id) updates[:updated_at] = Time.zone.now