lib/pitchfork/http_server.rb in pitchfork-0.1.2 vs lib/pitchfork/http_server.rb in pitchfork-0.2.0
- old
+ new
@@ -1,21 +1,22 @@
# -*- encoding: binary -*-
require 'pitchfork/pitchfork_http'
+require 'pitchfork/flock'
module Pitchfork
# This is the process manager of Pitchfork. This manages worker
# processes which in turn handle the I/O and application process.
# Listener sockets are started in the master process and shared with
# forked worker children.
class HttpServer
# :stopdoc:
attr_accessor :app, :timeout, :worker_processes,
- :before_fork, :after_fork,
+ :after_fork, :after_promotion,
:listener_opts, :children,
:orig_app, :config, :ready_pipe,
:default_middleware, :early_hints
- attr_writer :after_worker_exit, :after_worker_ready, :refork_condition, :mold_selector
+ attr_writer :after_worker_exit, :after_worker_ready, :refork_condition
attr_reader :logger
include Pitchfork::SocketHelper
include Pitchfork::HttpResponse
@@ -25,11 +26,10 @@
LISTENERS = []
NOOP = '.'
REFORKING_AVAILABLE = Pitchfork::CHILD_SUBREAPER_AVAILABLE || Process.pid == 1
- MAX_SLEEP = 1 # seconds
# :startdoc:
# This Hash is considered a stable interface and changing its contents
# will allow you to switch between different installations of Pitchfork
# or even different installations of the same applications without
@@ -58,17 +58,19 @@
# Creates a working server on host:port (strange things happen if
# port isn't a Number). Use HttpServer::run to start the server and
# HttpServer.run.join to join the thread that's processing
# incoming requests on the socket.
def initialize(app, options = {})
+ @exit_status = 0
@app = app
@respawn = false
@last_check = time_now
- @default_middleware = true
+ @promotion_lock = Flock.new("pitchfork-promotion")
+
options = options.dup
@ready_pipe = options.delete(:ready_pipe)
- @init_listeners = options[:listeners] ? options[:listeners].dup : []
+ @init_listeners = options[:listeners].dup || []
options[:use_defaults] = true
self.config = Pitchfork::Configurator.new(options)
self.listener_opts = {}
# We use @control_socket differently in the master and worker processes:
@@ -102,11 +104,11 @@
:QUIT, :INT, :TERM, :USR2, :TTIN, :TTOU ]
Worker.preallocate_drops(worker_processes)
end
# Runs the thing. Returns self so you can run join on it
- def start
+ def start(sync = true)
Pitchfork.enable_child_subreaper # noop if not supported
# This socketpair is used to wake us up from select(2) in #join when signals
# are trapped. See trap_deferred.
# It's also used by newly spawned children to send their soft_signal pipe
@@ -118,26 +120,29 @@
# trigger happy and send signals as soon as the pid file exists.
# Note that signals don't actually get handled until the #join method
@queue_sigs.each { |sig| trap(sig) { @sig_queue << sig; awaken_master } }
trap(:CHLD) { awaken_master }
- bind_listeners!
if REFORKING_AVAILABLE
spawn_initial_mold
wait_for_pending_workers
unless @children.mold
raise BootFailure, "The initial mold failed to boot"
end
else
build_app!
+ bind_listeners!
+ after_promotion.call(self, Worker.new(nil, pid: $$).promoted!)
end
- spawn_missing_workers
- # We could just return here as we'd register them later in #join.
- # However a good part of the test suite assumes #start only return
- # once all initial workers are spawned.
- wait_for_pending_workers
+ if sync
+ spawn_missing_workers
+ # We could just return here as we'd register them later in #join.
+ # However a good part of the test suite assumes #start only return
+ # once all initial workers are spawned.
+ wait_for_pending_workers
+ end
self
end
# replaces current listener set with +listeners+. This will
@@ -235,14 +240,23 @@
Pitchfork.log_error(@logger, "master loop error", e)
end
end
stop # gracefully shutdown all workers on our way out
logger.info "master complete"
+ @exit_status
end
def monitor_loop(sleep = true)
reap_all_workers
+
+ if REFORKING_AVAILABLE && @respawn && @children.molds.empty?
+ logger.info("No mold alive, shutting down")
+ @exit_status = 1
+ @sig_queue << :QUIT
+ @respawn = false
+ end
+
case message = @sig_queue.shift
when nil
# avoid murdering workers after our master process (or the
# machine) comes out of suspend/hibernation
if (@last_check + @timeout) >= (@last_check = time_now)
@@ -251,17 +265,19 @@
sleep_time = @timeout/2.0 + 1
@logger.debug("waiting #{sleep_time}s after suspend/hibernation")
end
if @respawn
maintain_worker_count
- automatically_refork_workers if REFORKING_AVAILABLE
+ restart_outdated_workers if REFORKING_AVAILABLE
end
master_sleep(sleep_time) if sleep
when :QUIT # graceful shutdown
+ logger.info "QUIT received, starting graceful shutdown"
return StopIteration
when :TERM, :INT # immediate shutdown
+ logger.info "#{message} received, starting immediate shutdown"
stop(false)
return StopIteration
when :USR2 # trigger a promotion
trigger_refork
when :TTIN
@@ -288,10 +304,11 @@
end
end
# Terminates all workers, but does not exit master process
def stop(graceful = true)
+ wait_for_pending_workers
self.listeners = []
limit = time_now + timeout
until @children.workers.empty? || time_now > limit
if graceful
soft_kill_each_child(:QUIT)
@@ -300,10 +317,11 @@
end
sleep(0.1)
reap_all_workers
end
kill_each_child(:KILL)
+ @promotion_lock.unlink
end
def rewindable_input
Pitchfork::HttpParser.input_class.method_defined?(:rewind)
end
@@ -331,12 +349,10 @@
private
# wait for a signal handler to wake us up and then consume the pipe
def master_sleep(sec)
- sec = MAX_SLEEP if sec > MAX_SLEEP
-
@control_socket[0].wait(sec) or return
case message = @control_socket[0].recvmsg_nonblock(exception: false)
when :wait_readable, NOOP
nil
else
@@ -349,22 +365,22 @@
@control_socket[1].sendmsg_nonblock(NOOP, exception: false) # wakeup master process from select
end
# reaps all unreaped workers
def reap_all_workers
- begin
+ loop do
wpid, status = Process.waitpid2(-1, Process::WNOHANG)
wpid or return
worker = @children.reap(wpid) and worker.close rescue nil
if worker
@after_worker_exit.call(self, worker, status)
else
logger.error("reaped unknown subprocess #{status.inspect}")
end
rescue Errno::ECHILD
break
- end while true
+ end
end
def listener_sockets
listener_fds = {}
LISTENERS.each do |sock|
@@ -372,19 +388,10 @@
listener_fds[sock.fileno] = sock
end
listener_fds
end
- def close_sockets_on_exec(sockets)
- (3..1024).each do |io|
- next if sockets.include?(io)
- io = IO.for_fd(io) rescue next
- io.autoclose = false
- io.close_on_exec = true
- end
- end
-
# forcibly terminate all workers that haven't checked in in timeout seconds. The timeout is implemented using an unlinked File
def murder_lazy_workers
next_sleep = @timeout - 1
now = time_now.to_i
@children.workers.each do |worker|
@@ -411,21 +418,21 @@
unless REFORKING_AVAILABLE
logger.error("This system doesn't support PR_SET_CHILD_SUBREAPER, can't promote a worker")
end
unless @children.pending_promotion?
- @children.refresh
- if new_mold = @mold_selector.call(self)
+ if new_mold = @children.fresh_workers.first
@children.promote(new_mold)
else
- logger.error("The mold select didn't return a candidate")
+ logger.error("No children at all???")
end
else
end
end
def after_fork_internal
+ @promotion_lock.at_fork
@control_socket[0].close_write # this is master-only, now
@ready_pipe.close if @ready_pipe
Pitchfork::Configurator::RACKUP.clear
@ready_pipe = @init_listeners = nil
@@ -433,13 +440,13 @@
# dying workers can recycle pids
OpenSSL::Random.seed(rand.to_s) if defined?(OpenSSL::Random)
end
def spawn_worker(worker, detach:)
- before_fork.call(self, worker)
+ logger.info("worker=#{worker.nr} gen=#{worker.generation} spawning...")
- pid = fork do
+ pid = Pitchfork.clean_fork do
# We double fork so that the new worker is re-attached back
# to the master.
# This requires either PR_SET_CHILD_SUBREAPER which is exclusive to Linux 3.4
# or the master to be PID 1.
if detach && fork
@@ -465,14 +472,15 @@
end
def spawn_initial_mold
mold = Worker.new(nil)
mold.create_socketpair!
- mold.pid = fork do
- after_fork_internal
+ mold.pid = Pitchfork.clean_fork do
+ @promotion_lock.try_lock
mold.after_fork_in_child
build_app!
+ bind_listeners!
mold_loop(mold)
end
@children.register_mold(mold)
end
@@ -482,13 +490,15 @@
if @children.nr_alive?(worker_nr)
next
end
worker = Pitchfork::Worker.new(worker_nr)
- if !@children.mold || !@children.mold.spawn_worker(worker)
- # If there's no mold, or the mold was somehow unreachable
- # we fallback to spawning the missing workers ourselves.
+ if REFORKING_AVAILABLE
+ unless @children.mold&.spawn_worker(worker)
+ @logger.error("Failed to send a spawn_woker command")
+ end
+ else
spawn_worker(worker, detach: false)
end
# We could directly register workers when we spawn from the
# master, like pitchfork does. However it is preferable to
# always go through the asynchronous registering process for
@@ -502,49 +512,36 @@
def wait_for_pending_workers
while @children.pending_workers?
master_sleep(0.5)
if monitor_loop(false) == StopIteration
- break
+ return StopIteration
end
end
end
def maintain_worker_count
(off = @children.workers_count - worker_processes) == 0 and return
off < 0 and return spawn_missing_workers
@children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:QUIT) }
end
- def automatically_refork_workers
+ def restart_outdated_workers
# If we're already in the middle of forking a new generation, we just continue
- if @children.mold
- # We don't shutdown any outdated worker if any worker is already being spawned
- # or a worker is exiting. Workers are only reforked one by one to minimize the
- # impact on capacity.
- # In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at
- # a time.
- return if @children.pending_workers?
- return if @children.workers.any?(&:exiting?)
+ return unless @children.mold
- if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation }
- logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting")
- outdated_worker.soft_kill(:QUIT)
- return # That's all folks
- end
- end
+ # We don't shutdown any outdated worker if any worker is already being spawned
+ # or a worker is exiting. Workers are only reforked one by one to minimize the
+ # impact on capacity.
+ # In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at
+ # a time.
+ return if @children.pending_workers?
+ return if @children.workers.any?(&:exiting?)
- # If all workers are alive and well, we can consider reforking a new generation
- if @refork_condition
- @children.refresh
- if @refork_condition.met?(@children, logger)
- logger.info("Refork condition met, scheduling a promotion")
- unless @sig_queue.include?(:USR2)
- @sig_queue << :USR2
- awaken_master
- end
- end
+ if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation }
+ logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting")
+ outdated_worker.soft_kill(:QUIT)
end
end
# if we get any error, try to write something back to the client
# assuming we haven't closed the socket, but don't get hung up
@@ -665,10 +662,11 @@
readers
end
def init_mold_process(worker)
proc_name "mold (gen: #{worker.generation})"
+ after_promotion.call(self, worker)
readers = [worker]
trap(:QUIT) { nuke_listeners!(readers) }
readers
end
@@ -700,36 +698,51 @@
# but that will return false
client = sock.accept_nonblock(exception: false)
client = false if client == :wait_readable
if client
case client
+ when Message::PromoteWorker
+ if @promotion_lock.try_lock
+ logger.info("Refork asked by master, promoting ourselves")
+ worker.tick = time_now.to_i
+ return worker.promoted!
+ end
when Message
worker.update(client)
else
process_client(client)
worker.increment_requests_count
end
worker.tick = time_now.to_i
end
- return if worker.mold? # We've been promoted we can exit the loop
end
# timeout so we can .tick and keep parent from SIGKILL-ing us
worker.tick = time_now.to_i
+ if @refork_condition && !worker.outdated?
+ if @refork_condition.met?(worker, logger)
+ if @promotion_lock.try_lock
+ logger.info("Refork condition met, promoting ourselves")
+ return worker.promote! # We've been promoted we can exit the loop
+ else
+ # TODO: if we couldn't acquire the lock, we should backoff the refork_condition to avoid hammering the lock
+ end
+ end
+ end
+
waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
rescue => e
Pitchfork.log_error(@logger, "listen loop error", e) if readers[0]
end while readers[0]
end
def mold_loop(mold)
readers = init_mold_process(mold)
waiter = prep_readers(readers)
- mold.acknowlege_promotion(@control_socket[1])
-
+ mold.declare_promotion(@control_socket[1])
+ @promotion_lock.unlock
ready = readers.dup
- # TODO: mold ready callback?
begin
mold.tick = time_now.to_i
while sock = ready.shift
# Pitchfork::Worker#accept_nonblock is not like accept(2) at all,
@@ -737,20 +750,24 @@
message = sock.accept_nonblock(exception: false)
case message
when false
# no message, keep looping
when Message::SpawnWorker
- spawn_worker(Worker.new(message.nr, generation: mold.generation), detach: true)
+ begin
+ spawn_worker(Worker.new(message.nr, generation: mold.generation), detach: true)
+ rescue => error
+ raise BootFailure, error.message
+ end
else
logger.error("Unexpected mold message #{message.inspect}")
end
end
# timeout so we can .tick and keep parent from SIGKILL-ing us
mold.tick = time_now.to_i
waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
rescue => e
- Pitchfork.log_error(@logger, "listen loop error", e) if readers[0]
+ Pitchfork.log_error(@logger, "mold loop error", e) if readers[0]
end while readers[0]
end
# delivers a signal to a worker and fails gracefully if the worker
# is no longer running.