lib/pitchfork/http_server.rb in pitchfork-0.2.0 vs lib/pitchfork/http_server.rb in pitchfork-0.3.0
- old
+ new
@@ -8,11 +8,11 @@
# Listener sockets are started in the master process and shared with
# forked worker children.
class HttpServer
# :stopdoc:
attr_accessor :app, :timeout, :worker_processes,
- :after_fork, :after_promotion,
+ :after_worker_fork, :after_mold_fork,
:listener_opts, :children,
:orig_app, :config, :ready_pipe,
:default_middleware, :early_hints
attr_writer :after_worker_exit, :after_worker_ready, :refork_condition
@@ -25,12 +25,10 @@
# in new projects
LISTENERS = []
NOOP = '.'
- REFORKING_AVAILABLE = Pitchfork::CHILD_SUBREAPER_AVAILABLE || Process.pid == 1
-
# :startdoc:
# This Hash is considered a stable interface and changing its contents
# will allow you to switch between different installations of Pitchfork
# or even different installations of the same applications without
# downtime. Keys of this constant Hash are described as follows:
@@ -61,11 +59,11 @@
# incoming requests on the socket.
def initialize(app, options = {})
@exit_status = 0
@app = app
@respawn = false
- @last_check = time_now
+ @last_check = Pitchfork.time_now
@promotion_lock = Flock.new("pitchfork-promotion")
options = options.dup
@ready_pipe = options.delete(:ready_pipe)
@init_listeners = options[:listeners].dup || []
@@ -129,11 +127,11 @@
raise BootFailure, "The initial mold failed to boot"
end
else
build_app!
bind_listeners!
- after_promotion.call(self, Worker.new(nil, pid: $$).promoted!)
+ after_mold_fork.call(self, Worker.new(nil, pid: $$).promoted!)
end
if sync
spawn_missing_workers
# We could just return here as we'd register them later in #join.
@@ -176,11 +174,11 @@
Pitchfork::HttpParser::DEFAULTS["rack.logger"] = @logger = obj
end
# add a given address to the +listeners+ set, idempotently
# Allows workers to add a private, per-process listener via the
- # after_fork hook. Very useful for debugging and testing.
+ # after_worker_fork hook. Very useful for debugging and testing.
# +:tries+ may be specified as an option for the number of times
# to retry, and +:delay+ may be specified as the time in seconds
# to delay between retries.
# A negative value for +:tries+ indicates the listen will be
# retried indefinitely, this is useful when workers belonging to
@@ -257,11 +255,11 @@
case message = @sig_queue.shift
when nil
# avoid murdering workers after our master process (or the
# machine) comes out of suspend/hibernation
- if (@last_check + @timeout) >= (@last_check = time_now)
+ if (@last_check + @timeout) >= (@last_check = Pitchfork.time_now)
sleep_time = murder_lazy_workers
else
sleep_time = @timeout/2.0 + 1
@logger.debug("waiting #{sleep_time}s after suspend/hibernation")
end
@@ -287,38 +285,42 @@
self.worker_processes -= 1 if self.worker_processes > 0
when Message::WorkerSpawned
worker = @children.update(message)
# TODO: should we send a message to the worker to acknowledge?
logger.info "worker=#{worker.nr} pid=#{worker.pid} registered"
- when Message::WorkerPromoted
+ when Message::MoldSpawned
+ new_mold = @children.update(message)
+ logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} spawned")
+ when Message::MoldReady
old_molds = @children.molds
- new_mold = @children.fetch(message.pid)
- logger.info("worker=#{new_mold.nr} pid=#{new_mold.pid} promoted to a mold")
- @children.update(message)
+ new_mold = @children.update(message)
+ logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} ready")
old_molds.each do |old_mold|
- logger.info("Terminating old mold pid=#{old_mold.pid}")
+ logger.info("Terminating old mold pid=#{old_mold.pid} gen=#{old_mold.generation}")
old_mold.soft_kill(:QUIT)
end
else
logger.error("Unexpected message in sig_queue #{message.inspect}")
logger.error(@sig_queue.inspect)
end
end
# Terminates all workers, but does not exit master process
def stop(graceful = true)
+ @respawn = false
wait_for_pending_workers
self.listeners = []
- limit = time_now + timeout
- until @children.workers.empty? || time_now > limit
+ limit = Pitchfork.time_now + timeout
+ until @children.workers.empty? || Pitchfork.time_now > limit
if graceful
soft_kill_each_child(:QUIT)
else
kill_each_child(:TERM)
end
- sleep(0.1)
- reap_all_workers
+ if monitor_loop(false) == StopIteration
+ return StopIteration
+ end
end
kill_each_child(:KILL)
@promotion_lock.unlink
end
@@ -391,11 +393,11 @@
end
# forcibly terminate all workers that haven't checked in in timeout seconds. The timeout is implemented using an unlinked File
def murder_lazy_workers
next_sleep = @timeout - 1
- now = time_now.to_i
+ now = Pitchfork.time_now(true)
@children.workers.each do |worker|
tick = worker.tick
0 == tick and next # skip workers that haven't processed any clients
diff = now - tick
tmp = @timeout - diff
@@ -442,47 +444,32 @@
end
def spawn_worker(worker, detach:)
logger.info("worker=#{worker.nr} gen=#{worker.generation} spawning...")
- pid = Pitchfork.clean_fork do
- # We double fork so that the new worker is re-attached back
- # to the master.
- # This requires either PR_SET_CHILD_SUBREAPER which is exclusive to Linux 3.4
- # or the master to be PID 1.
- if detach && fork
- exit
- end
+ Pitchfork.fork_sibling do
worker.pid = Process.pid
after_fork_internal
worker_loop(worker)
- if worker.mold?
- mold_loop(worker)
- end
- exit
end
- if detach
- # If we double forked, we need to wait(2) so that the middle
- # process doesn't end up a zombie.
- Process.wait(pid)
- end
-
worker
end
def spawn_initial_mold
mold = Worker.new(nil)
mold.create_socketpair!
mold.pid = Pitchfork.clean_fork do
+ mold.pid = Process.pid
@promotion_lock.try_lock
mold.after_fork_in_child
build_app!
bind_listeners!
mold_loop(mold)
end
+ @promotion_lock.at_fork
@children.register_mold(mold)
end
def spawn_missing_workers
worker_nr = -1
@@ -492,11 +479,11 @@
end
worker = Pitchfork::Worker.new(worker_nr)
if REFORKING_AVAILABLE
unless @children.mold&.spawn_worker(worker)
- @logger.error("Failed to send a spawn_woker command")
+ @logger.error("Failed to send a spawn_worker command")
end
else
spawn_worker(worker, detach: false)
end
# We could directly register workers when we spawn from the
@@ -527,21 +514,22 @@
def restart_outdated_workers
# If we're already in the middle of forking a new generation, we just continue
return unless @children.mold
- # We don't shutdown any outdated worker if any worker is already being spawned
- # or a worker is exiting. Workers are only reforked one by one to minimize the
- # impact on capacity.
- # In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at
- # a time.
- return if @children.pending_workers?
- return if @children.workers.any?(&:exiting?)
+ # We don't shutdown any outdated worker if any worker is already being
+ # spawned or a worker is exiting. Only 10% of workers can be reforked at
+ # once to minimize the impact on capacity.
+ max_pending_workers = (worker_processes * 0.1).ceil
+ workers_to_restart = max_pending_workers - @children.restarting_workers_count
- if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation }
- logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting")
- outdated_worker.soft_kill(:QUIT)
+ if workers_to_restart > 0
+ outdated_workers = @children.workers.select { |w| !w.exiting? && w.generation < @children.mold.generation }
+ outdated_workers.each do |worker|
+ logger.info("worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} restarting")
+ worker.soft_kill(:QUIT)
+ end
end
end
# if we get any error, try to write something back to the client
# assuming we haven't closed the socket, but don't get hung up
@@ -646,28 +634,28 @@
exit_sigs.each { |sig| trap(sig) { exit!(0) } }
exit!(0) if (@sig_queue & exit_sigs)[0]
(@queue_sigs - exit_sigs).each { |sig| trap(sig, nil) }
trap(:CHLD, 'DEFAULT')
@sig_queue.clear
- proc_name "worker[#{worker.nr}] (gen:#{worker.generation})"
+ proc_name "(gen:#{worker.generation}) worker[#{worker.nr}]"
@children = nil
- after_fork.call(self, worker) # can drop perms and create listeners
+ after_worker_fork.call(self, worker) # can drop perms and create listeners
LISTENERS.each { |sock| sock.close_on_exec = true }
@config = nil
@listener_opts = @orig_app = nil
readers = LISTENERS.dup
readers << worker
trap(:QUIT) { nuke_listeners!(readers) }
readers
end
- def init_mold_process(worker)
- proc_name "mold (gen: #{worker.generation})"
- after_promotion.call(self, worker)
- readers = [worker]
+ def init_mold_process(mold)
+ proc_name "(gen: #{mold.generation}) mold"
+ after_mold_fork.call(self, mold)
+ readers = [mold]
trap(:QUIT) { nuke_listeners!(readers) }
readers
end
if Pitchfork.const_defined?(:Waiter)
@@ -689,87 +677,103 @@
waiter = prep_readers(readers)
ready = readers.dup
@after_worker_ready.call(self, worker)
- begin
- worker.tick = time_now.to_i
- while sock = ready.shift
- # Pitchfork::Worker#accept_nonblock is not like accept(2) at all,
- # but that will return false
- client = sock.accept_nonblock(exception: false)
- client = false if client == :wait_readable
- if client
- case client
- when Message::PromoteWorker
- if @promotion_lock.try_lock
- logger.info("Refork asked by master, promoting ourselves")
- worker.tick = time_now.to_i
- return worker.promoted!
+ while readers[0]
+ begin
+ worker.tick = Pitchfork.time_now(true)
+ while sock = ready.shift
+ # Pitchfork::Worker#accept_nonblock is not like accept(2) at all,
+ # but that will return false
+ client = sock.accept_nonblock(exception: false)
+ client = false if client == :wait_readable
+ if client
+ case client
+ when Message::PromoteWorker
+ spawn_mold(worker.generation)
+ when Message
+ worker.update(client)
+ else
+ process_client(client)
+ worker.increment_requests_count
end
- when Message
- worker.update(client)
- else
- process_client(client)
- worker.increment_requests_count
+ worker.tick = Pitchfork.time_now(true)
end
- worker.tick = time_now.to_i
end
- end
- # timeout so we can .tick and keep parent from SIGKILL-ing us
- worker.tick = time_now.to_i
- if @refork_condition && !worker.outdated?
- if @refork_condition.met?(worker, logger)
- if @promotion_lock.try_lock
- logger.info("Refork condition met, promoting ourselves")
- return worker.promote! # We've been promoted we can exit the loop
- else
- # TODO: if we couldn't acquire the lock, we should backoff the refork_condition to avoid hammering the lock
+ # timeout so we can .tick and keep parent from SIGKILL-ing us
+ worker.tick = Pitchfork.time_now(true)
+
+ if @refork_condition && !worker.outdated?
+ if @refork_condition.met?(worker, logger)
+ if spawn_mold(worker.generation)
+ logger.info("Refork condition met, promoting ourselves")
+ end
+ @refork_condition.backoff!
end
end
+
+ waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
+ rescue => e
+ Pitchfork.log_error(@logger, "listen loop error", e) if readers[0]
end
+ end
+ end
- waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
- rescue => e
- Pitchfork.log_error(@logger, "listen loop error", e) if readers[0]
- end while readers[0]
+ def spawn_mold(current_generation)
+ return false unless @promotion_lock.try_lock
+
+ begin
+ Pitchfork.fork_sibling do
+ mold = Worker.new(nil, pid: Process.pid, generation: current_generation)
+ mold.promote!
+ mold.start_promotion(@control_socket[1])
+ mold_loop(mold)
+ end
+ true
+ ensure
+ @promotion_lock.at_fork # We let the spawned mold own the lock
+ end
end
def mold_loop(mold)
readers = init_mold_process(mold)
waiter = prep_readers(readers)
- mold.declare_promotion(@control_socket[1])
@promotion_lock.unlock
ready = readers.dup
- begin
- mold.tick = time_now.to_i
- while sock = ready.shift
- # Pitchfork::Worker#accept_nonblock is not like accept(2) at all,
- # but that will return false
- message = sock.accept_nonblock(exception: false)
- case message
- when false
- # no message, keep looping
- when Message::SpawnWorker
- begin
- spawn_worker(Worker.new(message.nr, generation: mold.generation), detach: true)
- rescue => error
- raise BootFailure, error.message
+ mold.finish_promotion(@control_socket[1])
+
+ while readers[0]
+ begin
+ mold.tick = Pitchfork.time_now(true)
+ while sock = ready.shift
+ # Pitchfork::Worker#accept_nonblock is not like accept(2) at all,
+ # but that will return false
+ message = sock.accept_nonblock(exception: false)
+ case message
+ when false
+ # no message, keep looping
+ when Message::SpawnWorker
+ begin
+ spawn_worker(Worker.new(message.nr, generation: mold.generation), detach: true)
+ rescue => error
+ raise BootFailure, error.message
+ end
+ else
+ logger.error("Unexpected mold message #{message.inspect}")
end
- else
- logger.error("Unexpected mold message #{message.inspect}")
end
- end
- # timeout so we can .tick and keep parent from SIGKILL-ing us
- mold.tick = time_now.to_i
- waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
- rescue => e
- Pitchfork.log_error(@logger, "mold loop error", e) if readers[0]
- end while readers[0]
+ # timeout so we can .tick and keep parent from SIGKILL-ing us
+ mold.tick = Pitchfork.time_now(true)
+ waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
+ rescue => e
+ Pitchfork.log_error(@logger, "mold loop error", e) if readers[0]
+ end
+ end
end
# delivers a signal to a worker and fails gracefully if the worker
# is no longer running.
def kill_worker(signal, wpid)
@@ -817,12 +821,8 @@
@init_listeners << Pitchfork::Const::DEFAULT_LISTEN
START_CTX[:argv] << "-l#{Pitchfork::Const::DEFAULT_LISTEN}"
end
listeners.each { |addr| listen(addr) }
raise ArgumentError, "no listeners" if LISTENERS.empty?
- end
-
- def time_now
- Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
end