lib/pitchfork/http_server.rb in pitchfork-0.10.0 vs lib/pitchfork/http_server.rb in pitchfork-0.11.0
- old
+ new
@@ -72,12 +72,12 @@
self
end
end
# :stopdoc:
- attr_accessor :app, :timeout, :soft_timeout, :cleanup_timeout, :spawn_timeout, :worker_processes,
- :after_worker_fork, :after_mold_fork,
+ attr_accessor :app, :timeout, :timeout_signal, :soft_timeout, :cleanup_timeout, :spawn_timeout, :worker_processes,
+ :before_fork, :after_worker_fork, :after_mold_fork,
:listener_opts, :children,
:orig_app, :config, :ready_pipe,
:default_middleware, :early_hints
attr_writer :after_worker_exit, :before_worker_exit, :after_worker_ready, :after_request_complete,
:refork_condition, :after_worker_timeout, :after_worker_hard_timeout, :after_monitor_ready
@@ -199,11 +199,11 @@
raise BootFailure, "The initial mold failed to boot"
end
else
build_app!
bind_listeners!
- after_mold_fork.call(self, Worker.new(nil, pid: $$).promoted!)
+ after_mold_fork.call(self, Worker.new(nil, pid: $$).promoted!(@spawn_timeout))
end
if sync
spawn_missing_workers
# We could just return here as we'd register them later in #join.
@@ -313,11 +313,11 @@
rescue => e
Pitchfork.log_error(@logger, "master loop error", e)
end
end
stop # gracefully shutdown all workers on our way out
- logger.info "master complete"
+ logger.info "master complete status=#{@exit_status}"
@exit_status
end
def monitor_loop(sleep = true)
reap_all_workers
@@ -366,11 +366,11 @@
when :TTOU
self.worker_processes -= 1 if self.worker_processes > 0
when Message::WorkerSpawned
worker = @children.update(message)
# TODO: should we send a message to the worker to acknowledge?
- logger.info "worker=#{worker.nr} pid=#{worker.pid} registered"
+ logger.info "worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} registered"
when Message::MoldSpawned
new_mold = @children.update(message)
logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} spawned")
when Message::MoldReady
old_molds = @children.molds
@@ -392,25 +392,31 @@
@respawn = false
SharedMemory.shutting_down!
wait_for_pending_workers
self.listeners = []
limit = Pitchfork.time_now + timeout
- until @children.workers.empty? || Pitchfork.time_now > limit
+ until @children.empty? || Pitchfork.time_now > limit
if graceful
@children.soft_kill_all(:TERM)
else
@children.hard_kill_all(:INT)
end
if monitor_loop(false) == StopIteration
return StopIteration
end
end
- @children.hard_kill_all(:KILL)
+
+ @children.each do |child|
+ if child.pid
+ @children.hard_kill(@timeout_signal.call(child.pid), child)
+ end
+ end
@promotion_lock.unlink
end
def worker_exit(worker)
+ logger.info "worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} exiting"
proc_name status: "exiting"
if @before_worker_exit
begin
@before_worker_exit.call(self, worker)
@@ -492,44 +498,50 @@
# forcibly terminate all workers that haven't checked in in timeout seconds. The timeout is implemented using an unlinked File
def murder_lazy_workers
now = Pitchfork.time_now(true)
next_sleep = @timeout - 1
- @children.workers.each do |worker|
- deadline = worker.deadline
+ @children.each do |child|
+ deadline = child.deadline
if 0 == deadline # worker is idle
next
elsif deadline > now # worker still has time
time_left = deadline - now
if time_left < next_sleep
next_sleep = time_left
end
next
else # worker is out of time
next_sleep = 0
- hard_timeout(worker)
+ hard_timeout(child)
end
end
next_sleep <= 0 ? 1 : next_sleep
end
- def hard_timeout(worker)
- if @after_worker_hard_timeout
+ def hard_timeout(child)
+ if child.pid.nil? # Not yet registered, likely never spawned
+ logger.error "worker=#{child.nr} timed out during spawn, abandoning"
+ @children.abandon(worker)
+ return
+ end
+
+ if @after_worker_hard_timeout && !child.mold?
begin
- @after_worker_hard_timeout.call(self, worker)
+ @after_worker_hard_timeout.call(self, child)
rescue => error
Pitchfork.log_error(@logger, "after_worker_hard_timeout callback", error)
end
end
- if worker.mold?
- logger.error "mold pid=#{worker.pid} timed out, killing"
+ if child.mold?
+ logger.error "mold pid=#{child.pid} gen=#{child.generation} timed out, killing"
else
- logger.error "worker=#{worker.nr} pid=#{worker.pid} timed out, killing"
+ logger.error "worker=#{child.nr} pid=#{child.pid} gen=#{child.generation} timed out, killing"
end
- @children.hard_timeout(worker) # take no prisoners for hard timeout violations
+ @children.hard_kill(@timeout_signal.call(child.pid), child) # take no prisoners for hard timeout violations
end
def trigger_refork
unless REFORKING_AVAILABLE
logger.error("This system doesn't support PR_SET_CHILD_SUBREAPER, can't refork")
@@ -561,11 +573,12 @@
# We set the deadline before spawning the child so that if for some
# reason it gets stuck before reaching the worker loop,
# the monitor process will kill it.
worker.update_deadline(@spawn_timeout)
- Pitchfork.fork_sibling do
+ @before_fork&.call(self)
+ fork_sibling("spawn_worker") do
worker.pid = Process.pid
after_fork_internal
worker_loop(worker)
worker_exit(worker)
@@ -596,10 +609,12 @@
next
end
worker = Pitchfork::Worker.new(worker_nr)
if REFORKING_AVAILABLE
+ worker.generation = @children.mold&.generation || 0
+
unless @children.mold&.spawn_worker(worker)
@logger.error("Failed to send a spawn_worker command")
end
else
spawn_worker(worker, detach: false)
@@ -782,19 +797,22 @@
@listener_opts = @orig_app = nil
readers = LISTENERS.dup
readers << worker
trap(:QUIT) { nuke_listeners!(readers) }
trap(:TERM) { nuke_listeners!(readers) }
+ trap(:INT) { nuke_listeners!(readers); exit!(0) }
readers
end
def init_mold_process(mold)
- proc_name role: "(gen:#{mold.generation}) mold", status: "ready"
+ proc_name role: "(gen:#{mold.generation}) mold", status: "init"
after_mold_fork.call(self, mold)
readers = [mold]
trap(:QUIT) { nuke_listeners!(readers) }
trap(:TERM) { nuke_listeners!(readers) }
+ trap(:INT) { nuke_listeners!(readers); exit!(0) }
+ proc_name role: "(gen:#{mold.generation}) mold", status: "ready"
readers
end
if Pitchfork.const_defined?(:Waiter)
def prep_readers(readers)
@@ -829,11 +847,11 @@
client = false if client == :wait_readable
if client
case client
when Message::PromoteWorker
if Info.fork_safe?
- spawn_mold(worker.generation)
+ spawn_mold(worker)
else
logger.error("worker=#{worker.nr} gen=#{worker.generation} is no longer fork safe, can't refork")
end
when Message
worker.update(client)
@@ -850,12 +868,12 @@
worker.update_deadline(@timeout)
if @refork_condition && Info.fork_safe? && !worker.outdated?
if @refork_condition.met?(worker, logger)
proc_name status: "requests: #{worker.requests_count}, spawning mold"
- if spawn_mold(worker.generation)
- logger.info("Refork condition met, promoting ourselves")
+ if spawn_mold(worker)
+ logger.info("worker=#{worker.nr} gen=#{worker.generation} Refork condition met, promoting ourselves")
end
@refork_condition.backoff!
end
end
@@ -865,17 +883,21 @@
Pitchfork.log_error(@logger, "listen loop error", e) if readers[0]
end
end
end
- def spawn_mold(current_generation)
+ def spawn_mold(worker)
return false unless @promotion_lock.try_lock
+ worker.update_deadline(@spawn_timeout)
+
+ @before_fork&.call(self)
+
begin
- Pitchfork.fork_sibling do
- mold = Worker.new(nil, pid: Process.pid, generation: current_generation)
- mold.promote!
+ fork_sibling("spawn_mold") do
+ mold = Worker.new(nil, pid: Process.pid, generation: worker.generation)
+ mold.promote!(@spawn_timeout)
mold.start_promotion(@control_socket[1])
mold_loop(mold)
end
rescue
# HACK: we need to call this on error or on no error, but not on throw
@@ -905,12 +927,22 @@
message = sock.accept_nonblock(exception: false)
case message
when false
# no message, keep looping
when Message::SpawnWorker
+ retries = 1
begin
spawn_worker(Worker.new(message.nr, generation: mold.generation), detach: true)
+ rescue ForkFailure
+ if retries > 0
+ @logger.fatal("mold pid=#{mold.pid} gen=#{mold.generation} Failed to spawn a worker. Retrying.")
+ retries -= 1
+ retry
+ else
+ @logger.fatal("mold pid=#{mold.pid} gen=#{mold.generation} Failed to spawn a worker twice in a row. Corrupted mold process?")
+ Process.exit(1)
+ end
rescue => error
raise BootFailure, error.message
end
else
logger.error("Unexpected mold message #{message.inspect}")
@@ -974,8 +1006,64 @@
def prepare_timeout(worker)
handler = TimeoutHandler.new(self, worker, @after_worker_timeout)
handler.timeout_request = SoftTimeout.request(@soft_timeout, handler)
handler
+ end
+
+ FORK_TIMEOUT = 5
+
+ def fork_sibling(role, &block)
+ if REFORKING_AVAILABLE
+ r, w = Pitchfork::Info.keep_ios(IO.pipe)
+ # We double fork so that the new worker is re-attached back
+ # to the master.
+ # This requires either PR_SET_CHILD_SUBREAPER which is exclusive to Linux 3.4
+ # or the master to be PID 1.
+ if middle_pid = FORK_LOCK.synchronize { Process.fork } # parent
+ w.close
+ # We need to wait(2) so that the middle process doesn't end up a zombie.
+ # The process only call fork again an exit so it should be pretty fast.
+ # However it might need to execute some `Process._fork` or `at_exit` callbacks,
+ # as well as Ruby's cleanup procedure to run finalizers etc, and there is a risk
+ # of deadlock.
+ # So in case it takes more than 5 seconds to exit, we kill it.
+ # TODO: rather than to busy loop here, we handle it in the worker/mold loop
+ process_wait_with_timeout(middle_pid, FORK_TIMEOUT)
+ pid_str = r.gets
+ r.close
+ if pid_str
+ Integer(pid_str)
+ else
+ raise ForkFailure, "fork_sibling didn't succeed in #{FORK_TIMEOUT} seconds"
+ end
+ else # first child
+ r.close
+ Process.setproctitle("<pitchfork fork_sibling(#{role})>")
+ pid = Pitchfork.clean_fork do
+ # detach into a grand child
+ w.close
+ yield
+ end
+ w.puts(pid)
+ w.close
+ exit
+ end
+ else
+ clean_fork(&block)
+ end
+ end
+
+ def process_wait_with_timeout(pid, timeout)
+ (timeout * 50).times do
+ _, status = Process.waitpid2(pid, Process::WNOHANG)
+ return status if status
+ sleep 0.02 # 50 * 20ms => 1s
+ end
+
+ # The process didn't exit in the allotted time, so we kill it.
+ Process.kill(@timeout_signal.call(pid), pid)
+ _, status = Process.waitpid2(pid)
+ status
end
end
end