lib/pitchfork/http_server.rb in pitchfork-0.13.0 vs lib/pitchfork/http_server.rb in pitchfork-0.14.0
- old
+ new
@@ -75,11 +75,11 @@
end
end
# :stopdoc:
attr_accessor :app, :timeout, :timeout_signal, :soft_timeout, :cleanup_timeout, :spawn_timeout, :worker_processes,
- :before_fork, :after_worker_fork, :after_mold_fork,
+ :before_fork, :after_worker_fork, :after_mold_fork, :before_service_worker_ready, :before_service_worker_exit,
:listener_opts, :children,
:orig_app, :config, :ready_pipe,
:default_middleware, :early_hints
attr_writer :after_worker_exit, :before_worker_exit, :after_worker_ready, :after_request_complete,
:refork_condition, :after_worker_timeout, :after_worker_hard_timeout, :after_monitor_ready
@@ -93,36 +93,10 @@
# in new projects
LISTENERS = []
NOOP = '.'
- # :startdoc:
- # This Hash is considered a stable interface and changing its contents
- # will allow you to switch between different installations of Pitchfork
- # or even different installations of the same applications without
- # downtime. Keys of this constant Hash are described as follows:
- #
- # * 0 - the path to the pitchfork executable
- # * :argv - a deep copy of the ARGV array the executable originally saw
- # * :cwd - the working directory of the application, this is where
- # you originally started Pitchfork.
- # TODO: Can we get rid of this?
- START_CTX = {
- :argv => ARGV.map(&:dup),
- 0 => $0.dup,
- }
- # We favor ENV['PWD'] since it is (usually) symlink aware for Capistrano
- # and like systems
- START_CTX[:cwd] = begin
- a = File.stat(pwd = ENV['PWD'])
- b = File.stat(Dir.pwd)
- a.ino == b.ino && a.dev == b.dev ? pwd : Dir.pwd
- rescue
- Dir.pwd
- end
- # :stopdoc:
-
# Creates a working server on host:port (strange things happen if
# port isn't a Number). Use HttpServer::run to start the server and
# HttpServer.run.join to join the thread that's processing
# incoming requests on the socket.
def initialize(app, options = {})
@@ -138,11 +112,11 @@
@init_listeners = options[:listeners].dup || []
options[:use_defaults] = true
self.config = Pitchfork::Configurator.new(options)
self.listener_opts = {}
- proc_name role: 'monitor', status: START_CTX[:argv].join(' ')
+ proc_name role: 'monitor', status: ARGV.join(' ')
# We use @control_socket differently in the master and worker processes:
#
# * The master process never closes or reinitializes this once
# initialized. Signal handlers in the master process will write to
@@ -171,11 +145,11 @@
# list of signals we care about and trap in master.
@queue_sigs = [
:QUIT, :INT, :TERM, :USR2, :TTIN, :TTOU ]
Info.workers_count = worker_processes
- SharedMemory.preallocate_drops(worker_processes)
+ SharedMemory.preallocate_pages(worker_processes)
end
# Runs the thing. Returns self so you can run join on it
def start(sync = true)
Pitchfork.enable_child_subreaper # noop if not supported
@@ -294,11 +268,11 @@
# one-at-a-time time and we'll happily drop signals in case somebody
# is signalling us too often.
def join
@respawn = true
- proc_name role: 'monitor', status: START_CTX[:argv].join(' ')
+ proc_name role: 'monitor', status: ARGV.join(' ')
logger.info "master process ready" # test_exec.rb relies on this message
if @ready_pipe
begin
@ready_pipe.syswrite($$.to_s)
@@ -372,10 +346,13 @@
# TODO: should we send a message to the worker to acknowledge?
logger.info "worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} registered"
when Message::MoldSpawned
new_mold = @children.update(message)
logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} spawned")
+ when Message::ServiceSpawned
+ new_service = @children.update(message)
+ logger.info("service pid=#{new_service.pid} gen=#{new_service.generation} spawned")
when Message::MoldReady
old_molds = @children.molds
new_mold = @children.update(message)
logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} ready")
old_molds.each do |old_mold|
@@ -427,10 +404,24 @@
end
end
Process.exit
end
+ def service_exit(service)
+ logger.info "service pid=#{service.pid} gen=#{service.generation} exiting"
+ proc_name status: "exiting"
+
+ if @before_service_worker_exit
+ begin
+ @before_service_worker_exit.call(self, service)
+ rescue => error
+ Pitchfork.log_error(logger, "before_service_worker_exit error", error)
+ end
+ end
+ Process.exit
+ end
+
def rewindable_input
Pitchfork::HttpParser.input_class.method_defined?(:rewind)
end
def rewindable_input=(bool)
@@ -588,10 +579,73 @@
end
worker
end
+ def service_loop(service)
+ readers = init_service_process(service)
+ waiter = prep_readers(readers)
+
+ ready = readers.dup
+
+ if @before_service_worker_ready
+ begin
+ @before_service_worker_ready.call(self, service)
+ rescue => error
+ Pitchfork.log_error(logger, "before_service_worker_ready", error)
+ Process.exit(1)
+ end
+ end
+
+ proc_name status: "ready"
+
+ while readers[0]
+ begin
+ service.update_deadline(@timeout)
+ while sock = ready.shift
+ # Pitchfork::Worker#accept_nonblock is not like accept(2) at all,
+ # but that will return false
+ client = sock.accept_nonblock(exception: false)
+
+ case client
+ when false, :wait_readable
+ # no message, keep looping
+ end
+
+ service.update_deadline(@timeout)
+ end
+
+ # timeout so we can update .deadline and keep parent from SIGKILL-ing us
+ service.update_deadline(@timeout)
+
+
+ waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
+ rescue => e
+ Pitchfork.log_error(@logger, "listen loop error", e) if readers[0]
+ end
+ end
+ end
+
+ def spawn_service(service, detach:)
+ logger.info("service gen=#{service.generation} spawning...")
+
+ # We set the deadline before spawning the child so that if for some
+ # reason it gets stuck before reaching the worker loop,
+ # the monitor process will kill it.
+ service.update_deadline(@spawn_timeout)
+ @before_fork&.call(self)
+ fork_sibling("spawn_service") do
+ service.pid = Process.pid
+
+ after_fork_internal
+ service_loop(service)
+ service_exit(service)
+ end
+
+ service
+ end
+
def spawn_initial_mold
mold = Worker.new(nil)
mold.create_socketpair!
mold.pid = Pitchfork.clean_fork do
mold.pid = Process.pid
@@ -604,10 +658,25 @@
@promotion_lock.at_fork
@children.register_mold(mold)
end
def spawn_missing_workers
+ if @before_service_worker_ready && !@children.service
+ service = Pitchfork::Service.new
+ if REFORKING_AVAILABLE
+ service.generation = @children.mold&.generation || 0
+
+ unless @children.mold&.spawn_service(service)
+ @logger.error("Failed to send a spawn_service command")
+ end
+ else
+ spawn_service(service, detach: false)
+ end
+
+ @children.register_service(service)
+ end
+
worker_nr = -1
until (worker_nr += 1) == @worker_processes
if @children.nr_alive?(worker_nr)
next
end
@@ -641,13 +710,22 @@
end
end
end
def maintain_worker_count
- (off = @children.workers_count - worker_processes) == 0 and return
- off < 0 and return spawn_missing_workers
- @children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:TERM) }
+ off = @children.workers_count - worker_processes
+ off -= 1 if @before_service_worker_ready && !@children.service
+
+ if off < 0
+ spawn_missing_workers
+ elsif off > 0
+ @children.each_worker do |worker|
+ if worker.nr >= worker_processes
+ worker.soft_kill(:TERM)
+ end
+ end
+ end
end
def restart_outdated_workers
# If we're already in the middle of forking a new generation, we just continue
return unless @children.mold
@@ -656,10 +734,20 @@
# spawned or a worker is exiting. Only 10% of workers can be reforked at
# once to minimize the impact on capacity.
max_pending_workers = (worker_processes * 0.1).ceil
workers_to_restart = max_pending_workers - @children.restarting_workers_count
+ if service = @children.service
+ if service.outdated?
+ if service.soft_kill(:TERM)
+ logger.info("Sent SIGTERM to service pid=#{service.pid} gen=#{service.generation}")
+ else
+ logger.info("Failed to send SIGTERM to service pid=#{service.pid} gen=#{service.generation}")
+ end
+ end
+ end
+
if workers_to_restart > 0
outdated_workers = @children.workers.select { |w| !w.exiting? && w.generation < @children.mold.generation }
outdated_workers.each do |worker|
if worker.soft_kill(:TERM)
logger.info("Sent SIGTERM to worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation}")
@@ -699,33 +787,33 @@
def e103_response_write(client, headers)
rss = @request.response_start_sent
buf = (rss ? "103 Early Hints\r\n" : "HTTP/1.1 103 Early Hints\r\n").b
headers.each { |key, value| append_header(buf, key, value) }
- buf << (rss ? "\r\nHTTP/1.1 ".freeze : "\r\n".freeze)
+ buf << (rss ? "\r\nHTTP/1.1 " : "\r\n")
client.write(buf)
end
def e100_response_write(client, env)
- # We use String#freeze to avoid allocations under Ruby 2.1+
- # Not many users hit this code path, so it's better to reduce the
- # constant table sizes even for Ruby 2.0 users who'll hit extra
- # allocations here.
client.write(@request.response_start_sent ?
- "100 Continue\r\n\r\nHTTP/1.1 ".freeze :
- "HTTP/1.1 100 Continue\r\n\r\n".freeze)
+ "100 Continue\r\n\r\nHTTP/1.1 " :
+ "HTTP/1.1 100 Continue\r\n\r\n")
env.delete('HTTP_EXPECT')
end
# once a client is accepted, it is processed in its entirety here
# in 3 easy steps: read request, call app, write app response
def process_client(client, worker, timeout_handler)
env = nil
@request = Pitchfork::HttpParser.new
env = @request.read(client)
- proc_name status: "requests: #{worker.requests_count}, processing: #{env["PATH_INFO"]}"
+ status = "requests: #{worker.requests_count}, processing: #{env["PATH_INFO"]}"
+ if request_id = env["HTTP_X_REQUEST_ID"]
+ status += ", request_id: #{request_id}"
+ end
+ proc_name status: status
env["pitchfork.worker"] = worker
timeout_handler.rack_env = env
env["pitchfork.timeout"] = timeout_handler
@@ -815,10 +903,22 @@
trap(:TERM) { nuke_listeners!(readers) }
trap(:INT) { nuke_listeners!(readers); exit!(0) }
readers
end
+ def init_service_process(service)
+ proc_name role: "(gen:#{service.generation}) mold", status: "init"
+ LISTENERS.each(&:close) # Don't appear as listening to incoming requests
+ service.register_to_master(@control_socket[1])
+ readers = [service]
+ trap(:QUIT) { nuke_listeners!(readers) }
+ trap(:TERM) { nuke_listeners!(readers) }
+ trap(:INT) { nuke_listeners!(readers); exit!(0) }
+ proc_name role: "(gen:#{service.generation}) service", status: "ready"
+ readers
+ end
+
def init_mold_process(mold)
proc_name role: "(gen:#{mold.generation}) mold", status: "init"
after_mold_fork.call(self, mold)
readers = [mold]
trap(:QUIT) { nuke_listeners!(readers) }
@@ -828,11 +928,11 @@
readers
end
if Pitchfork.const_defined?(:Waiter)
def prep_readers(readers)
- Pitchfork::Waiter.prep_readers(readers)
+ Pitchfork::Info.keep_io(Pitchfork::Waiter.prep_readers(readers))
end
else
require_relative 'select_waiter'
def prep_readers(_readers)
Pitchfork::SelectWaiter.new
@@ -856,28 +956,29 @@
worker.update_deadline(@timeout)
while sock = ready.shift
# Pitchfork::Worker#accept_nonblock is not like accept(2) at all,
# but that will return false
client = sock.accept_nonblock(exception: false)
- client = false if client == :wait_readable
- if client
- case client
- when Message::PromoteWorker
- if Info.fork_safe?
- spawn_mold(worker)
- else
- logger.error("worker=#{worker.nr} gen=#{worker.generation} is no longer fork safe, can't refork")
- end
- when Message
- worker.update(client)
+
+ case client
+ when false, :wait_readable
+ # no message, keep looping
+ when Message::PromoteWorker
+ if Info.fork_safe?
+ spawn_mold(worker)
else
- request_env = process_client(client, worker, prepare_timeout(worker))
- @after_request_complete&.call(self, worker, request_env)
- worker.increment_requests_count
+ logger.error("worker=#{worker.nr} gen=#{worker.generation} is no longer fork safe, can't refork")
end
- worker.update_deadline(@timeout)
+ when Message
+ worker.update(client)
+ else
+ request_env = process_client(client, worker, prepare_timeout(worker))
+ worker.increment_requests_count
+ @after_request_complete&.call(self, worker, request_env)
end
+
+ worker.update_deadline(@timeout)
end
# timeout so we can update .deadline and keep parent from SIGKILL-ing us
worker.update_deadline(@timeout)
@@ -956,10 +1057,26 @@
Process.exit(1)
end
rescue => error
raise BootFailure, error.message
end
+ when Message::SpawnService
+ retries = 1
+ begin
+ spawn_service(Service.new(generation: mold.generation), detach: true)
+ rescue ForkFailure
+ if retries > 0
+ @logger.fatal("mold pid=#{mold.pid} gen=#{mold.generation} Failed to spawn a service. Retrying.")
+ retries -= 1
+ retry
+ else
+ @logger.fatal("mold pid=#{mold.pid} gen=#{mold.generation} Failed to spawn a service twice in a row. Corrupted mold process?")
+ Process.exit(1)
+ end
+ rescue => error
+ raise BootFailure, error.message
+ end
else
logger.error("Unexpected mold message #{message.inspect}")
end
end
@@ -1002,18 +1119,17 @@
def proc_name(role: nil, status: nil)
@proctitle_role = role if role
@proctitle_status = status if status
- Process.setproctitle("#{File.basename(START_CTX[0])} #{@proctitle_role} - #{@proctitle_status}")
+ Process.setproctitle("#{File.basename($PROGRAM_NAME)} #{@proctitle_role} - #{@proctitle_status}")
end
def bind_listeners!
listeners = config[:listeners].dup
if listeners.empty?
listeners << Pitchfork::Const::DEFAULT_LISTEN
@init_listeners << Pitchfork::Const::DEFAULT_LISTEN
- START_CTX[:argv] << "-l#{Pitchfork::Const::DEFAULT_LISTEN}"
end
listeners.each { |addr| listen(addr) }
raise ArgumentError, "no listeners" if LISTENERS.empty?
end