lib/pitchfork/http_server.rb in pitchfork-0.10.0 vs lib/pitchfork/http_server.rb in pitchfork-0.11.0

- old
+ new

@@ -72,12 +72,12 @@ self end end # :stopdoc: - attr_accessor :app, :timeout, :soft_timeout, :cleanup_timeout, :spawn_timeout, :worker_processes, - :after_worker_fork, :after_mold_fork, + attr_accessor :app, :timeout, :timeout_signal, :soft_timeout, :cleanup_timeout, :spawn_timeout, :worker_processes, + :before_fork, :after_worker_fork, :after_mold_fork, :listener_opts, :children, :orig_app, :config, :ready_pipe, :default_middleware, :early_hints attr_writer :after_worker_exit, :before_worker_exit, :after_worker_ready, :after_request_complete, :refork_condition, :after_worker_timeout, :after_worker_hard_timeout, :after_monitor_ready @@ -199,11 +199,11 @@ raise BootFailure, "The initial mold failed to boot" end else build_app! bind_listeners! - after_mold_fork.call(self, Worker.new(nil, pid: $$).promoted!) + after_mold_fork.call(self, Worker.new(nil, pid: $$).promoted!(@spawn_timeout)) end if sync spawn_missing_workers # We could just return here as we'd register them later in #join. @@ -313,11 +313,11 @@ rescue => e Pitchfork.log_error(@logger, "master loop error", e) end end stop # gracefully shutdown all workers on our way out - logger.info "master complete" + logger.info "master complete status=#{@exit_status}" @exit_status end def monitor_loop(sleep = true) reap_all_workers @@ -366,11 +366,11 @@ when :TTOU self.worker_processes -= 1 if self.worker_processes > 0 when Message::WorkerSpawned worker = @children.update(message) # TODO: should we send a message to the worker to acknowledge? - logger.info "worker=#{worker.nr} pid=#{worker.pid} registered" + logger.info "worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} registered" when Message::MoldSpawned new_mold = @children.update(message) logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} spawned") when Message::MoldReady old_molds = @children.molds @@ -392,25 +392,31 @@ @respawn = false SharedMemory.shutting_down! wait_for_pending_workers self.listeners = [] limit = Pitchfork.time_now + timeout - until @children.workers.empty? || Pitchfork.time_now > limit + until @children.empty? || Pitchfork.time_now > limit if graceful @children.soft_kill_all(:TERM) else @children.hard_kill_all(:INT) end if monitor_loop(false) == StopIteration return StopIteration end end - @children.hard_kill_all(:KILL) + + @children.each do |child| + if child.pid + @children.hard_kill(@timeout_signal.call(child.pid), child) + end + end @promotion_lock.unlink end def worker_exit(worker) + logger.info "worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} exiting" proc_name status: "exiting" if @before_worker_exit begin @before_worker_exit.call(self, worker) @@ -492,44 +498,50 @@ # forcibly terminate all workers that haven't checked in in timeout seconds. The timeout is implemented using an unlinked File def murder_lazy_workers now = Pitchfork.time_now(true) next_sleep = @timeout - 1 - @children.workers.each do |worker| - deadline = worker.deadline + @children.each do |child| + deadline = child.deadline if 0 == deadline # worker is idle next elsif deadline > now # worker still has time time_left = deadline - now if time_left < next_sleep next_sleep = time_left end next else # worker is out of time next_sleep = 0 - hard_timeout(worker) + hard_timeout(child) end end next_sleep <= 0 ? 1 : next_sleep end - def hard_timeout(worker) - if @after_worker_hard_timeout + def hard_timeout(child) + if child.pid.nil? # Not yet registered, likely never spawned + logger.error "worker=#{child.nr} timed out during spawn, abandoning" + @children.abandon(worker) + return + end + + if @after_worker_hard_timeout && !child.mold? begin - @after_worker_hard_timeout.call(self, worker) + @after_worker_hard_timeout.call(self, child) rescue => error Pitchfork.log_error(@logger, "after_worker_hard_timeout callback", error) end end - if worker.mold? - logger.error "mold pid=#{worker.pid} timed out, killing" + if child.mold? + logger.error "mold pid=#{child.pid} gen=#{child.generation} timed out, killing" else - logger.error "worker=#{worker.nr} pid=#{worker.pid} timed out, killing" + logger.error "worker=#{child.nr} pid=#{child.pid} gen=#{child.generation} timed out, killing" end - @children.hard_timeout(worker) # take no prisoners for hard timeout violations + @children.hard_kill(@timeout_signal.call(child.pid), child) # take no prisoners for hard timeout violations end def trigger_refork unless REFORKING_AVAILABLE logger.error("This system doesn't support PR_SET_CHILD_SUBREAPER, can't refork") @@ -561,11 +573,12 @@ # We set the deadline before spawning the child so that if for some # reason it gets stuck before reaching the worker loop, # the monitor process will kill it. worker.update_deadline(@spawn_timeout) - Pitchfork.fork_sibling do + @before_fork&.call(self) + fork_sibling("spawn_worker") do worker.pid = Process.pid after_fork_internal worker_loop(worker) worker_exit(worker) @@ -596,10 +609,12 @@ next end worker = Pitchfork::Worker.new(worker_nr) if REFORKING_AVAILABLE + worker.generation = @children.mold&.generation || 0 + unless @children.mold&.spawn_worker(worker) @logger.error("Failed to send a spawn_worker command") end else spawn_worker(worker, detach: false) @@ -782,19 +797,22 @@ @listener_opts = @orig_app = nil readers = LISTENERS.dup readers << worker trap(:QUIT) { nuke_listeners!(readers) } trap(:TERM) { nuke_listeners!(readers) } + trap(:INT) { nuke_listeners!(readers); exit!(0) } readers end def init_mold_process(mold) - proc_name role: "(gen:#{mold.generation}) mold", status: "ready" + proc_name role: "(gen:#{mold.generation}) mold", status: "init" after_mold_fork.call(self, mold) readers = [mold] trap(:QUIT) { nuke_listeners!(readers) } trap(:TERM) { nuke_listeners!(readers) } + trap(:INT) { nuke_listeners!(readers); exit!(0) } + proc_name role: "(gen:#{mold.generation}) mold", status: "ready" readers end if Pitchfork.const_defined?(:Waiter) def prep_readers(readers) @@ -829,11 +847,11 @@ client = false if client == :wait_readable if client case client when Message::PromoteWorker if Info.fork_safe? - spawn_mold(worker.generation) + spawn_mold(worker) else logger.error("worker=#{worker.nr} gen=#{worker.generation} is no longer fork safe, can't refork") end when Message worker.update(client) @@ -850,12 +868,12 @@ worker.update_deadline(@timeout) if @refork_condition && Info.fork_safe? && !worker.outdated? if @refork_condition.met?(worker, logger) proc_name status: "requests: #{worker.requests_count}, spawning mold" - if spawn_mold(worker.generation) - logger.info("Refork condition met, promoting ourselves") + if spawn_mold(worker) + logger.info("worker=#{worker.nr} gen=#{worker.generation} Refork condition met, promoting ourselves") end @refork_condition.backoff! end end @@ -865,17 +883,21 @@ Pitchfork.log_error(@logger, "listen loop error", e) if readers[0] end end end - def spawn_mold(current_generation) + def spawn_mold(worker) return false unless @promotion_lock.try_lock + worker.update_deadline(@spawn_timeout) + + @before_fork&.call(self) + begin - Pitchfork.fork_sibling do - mold = Worker.new(nil, pid: Process.pid, generation: current_generation) - mold.promote! + fork_sibling("spawn_mold") do + mold = Worker.new(nil, pid: Process.pid, generation: worker.generation) + mold.promote!(@spawn_timeout) mold.start_promotion(@control_socket[1]) mold_loop(mold) end rescue # HACK: we need to call this on error or on no error, but not on throw @@ -905,12 +927,22 @@ message = sock.accept_nonblock(exception: false) case message when false # no message, keep looping when Message::SpawnWorker + retries = 1 begin spawn_worker(Worker.new(message.nr, generation: mold.generation), detach: true) + rescue ForkFailure + if retries > 0 + @logger.fatal("mold pid=#{mold.pid} gen=#{mold.generation} Failed to spawn a worker. Retrying.") + retries -= 1 + retry + else + @logger.fatal("mold pid=#{mold.pid} gen=#{mold.generation} Failed to spawn a worker twice in a row. Corrupted mold process?") + Process.exit(1) + end rescue => error raise BootFailure, error.message end else logger.error("Unexpected mold message #{message.inspect}") @@ -974,8 +1006,64 @@ def prepare_timeout(worker) handler = TimeoutHandler.new(self, worker, @after_worker_timeout) handler.timeout_request = SoftTimeout.request(@soft_timeout, handler) handler + end + + FORK_TIMEOUT = 5 + + def fork_sibling(role, &block) + if REFORKING_AVAILABLE + r, w = Pitchfork::Info.keep_ios(IO.pipe) + # We double fork so that the new worker is re-attached back + # to the master. + # This requires either PR_SET_CHILD_SUBREAPER which is exclusive to Linux 3.4 + # or the master to be PID 1. + if middle_pid = FORK_LOCK.synchronize { Process.fork } # parent + w.close + # We need to wait(2) so that the middle process doesn't end up a zombie. + # The process only call fork again an exit so it should be pretty fast. + # However it might need to execute some `Process._fork` or `at_exit` callbacks, + # as well as Ruby's cleanup procedure to run finalizers etc, and there is a risk + # of deadlock. + # So in case it takes more than 5 seconds to exit, we kill it. + # TODO: rather than to busy loop here, we handle it in the worker/mold loop + process_wait_with_timeout(middle_pid, FORK_TIMEOUT) + pid_str = r.gets + r.close + if pid_str + Integer(pid_str) + else + raise ForkFailure, "fork_sibling didn't succeed in #{FORK_TIMEOUT} seconds" + end + else # first child + r.close + Process.setproctitle("<pitchfork fork_sibling(#{role})>") + pid = Pitchfork.clean_fork do + # detach into a grand child + w.close + yield + end + w.puts(pid) + w.close + exit + end + else + clean_fork(&block) + end + end + + def process_wait_with_timeout(pid, timeout) + (timeout * 50).times do + _, status = Process.waitpid2(pid, Process::WNOHANG) + return status if status + sleep 0.02 # 50 * 20ms => 1s + end + + # The process didn't exit in the allotted time, so we kill it. + Process.kill(@timeout_signal.call(pid), pid) + _, status = Process.waitpid2(pid) + status end end end