lib/pitchfork/http_server.rb in pitchfork-0.13.0 vs lib/pitchfork/http_server.rb in pitchfork-0.14.0

- old
+ new

@@ -75,11 +75,11 @@ end end # :stopdoc: attr_accessor :app, :timeout, :timeout_signal, :soft_timeout, :cleanup_timeout, :spawn_timeout, :worker_processes, - :before_fork, :after_worker_fork, :after_mold_fork, + :before_fork, :after_worker_fork, :after_mold_fork, :before_service_worker_ready, :before_service_worker_exit, :listener_opts, :children, :orig_app, :config, :ready_pipe, :default_middleware, :early_hints attr_writer :after_worker_exit, :before_worker_exit, :after_worker_ready, :after_request_complete, :refork_condition, :after_worker_timeout, :after_worker_hard_timeout, :after_monitor_ready @@ -93,36 +93,10 @@ # in new projects LISTENERS = [] NOOP = '.' - # :startdoc: - # This Hash is considered a stable interface and changing its contents - # will allow you to switch between different installations of Pitchfork - # or even different installations of the same applications without - # downtime. Keys of this constant Hash are described as follows: - # - # * 0 - the path to the pitchfork executable - # * :argv - a deep copy of the ARGV array the executable originally saw - # * :cwd - the working directory of the application, this is where - # you originally started Pitchfork. - # TODO: Can we get rid of this? - START_CTX = { - :argv => ARGV.map(&:dup), - 0 => $0.dup, - } - # We favor ENV['PWD'] since it is (usually) symlink aware for Capistrano - # and like systems - START_CTX[:cwd] = begin - a = File.stat(pwd = ENV['PWD']) - b = File.stat(Dir.pwd) - a.ino == b.ino && a.dev == b.dev ? pwd : Dir.pwd - rescue - Dir.pwd - end - # :stopdoc: - # Creates a working server on host:port (strange things happen if # port isn't a Number). Use HttpServer::run to start the server and # HttpServer.run.join to join the thread that's processing # incoming requests on the socket. def initialize(app, options = {}) @@ -138,11 +112,11 @@ @init_listeners = options[:listeners].dup || [] options[:use_defaults] = true self.config = Pitchfork::Configurator.new(options) self.listener_opts = {} - proc_name role: 'monitor', status: START_CTX[:argv].join(' ') + proc_name role: 'monitor', status: ARGV.join(' ') # We use @control_socket differently in the master and worker processes: # # * The master process never closes or reinitializes this once # initialized. Signal handlers in the master process will write to @@ -171,11 +145,11 @@ # list of signals we care about and trap in master. @queue_sigs = [ :QUIT, :INT, :TERM, :USR2, :TTIN, :TTOU ] Info.workers_count = worker_processes - SharedMemory.preallocate_drops(worker_processes) + SharedMemory.preallocate_pages(worker_processes) end # Runs the thing. Returns self so you can run join on it def start(sync = true) Pitchfork.enable_child_subreaper # noop if not supported @@ -294,11 +268,11 @@ # one-at-a-time time and we'll happily drop signals in case somebody # is signalling us too often. def join @respawn = true - proc_name role: 'monitor', status: START_CTX[:argv].join(' ') + proc_name role: 'monitor', status: ARGV.join(' ') logger.info "master process ready" # test_exec.rb relies on this message if @ready_pipe begin @ready_pipe.syswrite($$.to_s) @@ -372,10 +346,13 @@ # TODO: should we send a message to the worker to acknowledge? logger.info "worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} registered" when Message::MoldSpawned new_mold = @children.update(message) logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} spawned") + when Message::ServiceSpawned + new_service = @children.update(message) + logger.info("service pid=#{new_service.pid} gen=#{new_service.generation} spawned") when Message::MoldReady old_molds = @children.molds new_mold = @children.update(message) logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} ready") old_molds.each do |old_mold| @@ -427,10 +404,24 @@ end end Process.exit end + def service_exit(service) + logger.info "service pid=#{service.pid} gen=#{service.generation} exiting" + proc_name status: "exiting" + + if @before_service_worker_exit + begin + @before_service_worker_exit.call(self, service) + rescue => error + Pitchfork.log_error(logger, "before_service_worker_exit error", error) + end + end + Process.exit + end + def rewindable_input Pitchfork::HttpParser.input_class.method_defined?(:rewind) end def rewindable_input=(bool) @@ -588,10 +579,73 @@ end worker end + def service_loop(service) + readers = init_service_process(service) + waiter = prep_readers(readers) + + ready = readers.dup + + if @before_service_worker_ready + begin + @before_service_worker_ready.call(self, service) + rescue => error + Pitchfork.log_error(logger, "before_service_worker_ready", error) + Process.exit(1) + end + end + + proc_name status: "ready" + + while readers[0] + begin + service.update_deadline(@timeout) + while sock = ready.shift + # Pitchfork::Worker#accept_nonblock is not like accept(2) at all, + # but that will return false + client = sock.accept_nonblock(exception: false) + + case client + when false, :wait_readable + # no message, keep looping + end + + service.update_deadline(@timeout) + end + + # timeout so we can update .deadline and keep parent from SIGKILL-ing us + service.update_deadline(@timeout) + + + waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved + rescue => e + Pitchfork.log_error(@logger, "listen loop error", e) if readers[0] + end + end + end + + def spawn_service(service, detach:) + logger.info("service gen=#{service.generation} spawning...") + + # We set the deadline before spawning the child so that if for some + # reason it gets stuck before reaching the worker loop, + # the monitor process will kill it. + service.update_deadline(@spawn_timeout) + @before_fork&.call(self) + fork_sibling("spawn_service") do + service.pid = Process.pid + + after_fork_internal + service_loop(service) + service_exit(service) + end + + service + end + def spawn_initial_mold mold = Worker.new(nil) mold.create_socketpair! mold.pid = Pitchfork.clean_fork do mold.pid = Process.pid @@ -604,10 +658,25 @@ @promotion_lock.at_fork @children.register_mold(mold) end def spawn_missing_workers + if @before_service_worker_ready && !@children.service + service = Pitchfork::Service.new + if REFORKING_AVAILABLE + service.generation = @children.mold&.generation || 0 + + unless @children.mold&.spawn_service(service) + @logger.error("Failed to send a spawn_service command") + end + else + spawn_service(service, detach: false) + end + + @children.register_service(service) + end + worker_nr = -1 until (worker_nr += 1) == @worker_processes if @children.nr_alive?(worker_nr) next end @@ -641,13 +710,22 @@ end end end def maintain_worker_count - (off = @children.workers_count - worker_processes) == 0 and return - off < 0 and return spawn_missing_workers - @children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:TERM) } + off = @children.workers_count - worker_processes + off -= 1 if @before_service_worker_ready && !@children.service + + if off < 0 + spawn_missing_workers + elsif off > 0 + @children.each_worker do |worker| + if worker.nr >= worker_processes + worker.soft_kill(:TERM) + end + end + end end def restart_outdated_workers # If we're already in the middle of forking a new generation, we just continue return unless @children.mold @@ -656,10 +734,20 @@ # spawned or a worker is exiting. Only 10% of workers can be reforked at # once to minimize the impact on capacity. max_pending_workers = (worker_processes * 0.1).ceil workers_to_restart = max_pending_workers - @children.restarting_workers_count + if service = @children.service + if service.outdated? + if service.soft_kill(:TERM) + logger.info("Sent SIGTERM to service pid=#{service.pid} gen=#{service.generation}") + else + logger.info("Failed to send SIGTERM to service pid=#{service.pid} gen=#{service.generation}") + end + end + end + if workers_to_restart > 0 outdated_workers = @children.workers.select { |w| !w.exiting? && w.generation < @children.mold.generation } outdated_workers.each do |worker| if worker.soft_kill(:TERM) logger.info("Sent SIGTERM to worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation}") @@ -699,33 +787,33 @@ def e103_response_write(client, headers) rss = @request.response_start_sent buf = (rss ? "103 Early Hints\r\n" : "HTTP/1.1 103 Early Hints\r\n").b headers.each { |key, value| append_header(buf, key, value) } - buf << (rss ? "\r\nHTTP/1.1 ".freeze : "\r\n".freeze) + buf << (rss ? "\r\nHTTP/1.1 " : "\r\n") client.write(buf) end def e100_response_write(client, env) - # We use String#freeze to avoid allocations under Ruby 2.1+ - # Not many users hit this code path, so it's better to reduce the - # constant table sizes even for Ruby 2.0 users who'll hit extra - # allocations here. client.write(@request.response_start_sent ? - "100 Continue\r\n\r\nHTTP/1.1 ".freeze : - "HTTP/1.1 100 Continue\r\n\r\n".freeze) + "100 Continue\r\n\r\nHTTP/1.1 " : + "HTTP/1.1 100 Continue\r\n\r\n") env.delete('HTTP_EXPECT') end # once a client is accepted, it is processed in its entirety here # in 3 easy steps: read request, call app, write app response def process_client(client, worker, timeout_handler) env = nil @request = Pitchfork::HttpParser.new env = @request.read(client) - proc_name status: "requests: #{worker.requests_count}, processing: #{env["PATH_INFO"]}" + status = "requests: #{worker.requests_count}, processing: #{env["PATH_INFO"]}" + if request_id = env["HTTP_X_REQUEST_ID"] + status += ", request_id: #{request_id}" + end + proc_name status: status env["pitchfork.worker"] = worker timeout_handler.rack_env = env env["pitchfork.timeout"] = timeout_handler @@ -815,10 +903,22 @@ trap(:TERM) { nuke_listeners!(readers) } trap(:INT) { nuke_listeners!(readers); exit!(0) } readers end + def init_service_process(service) + proc_name role: "(gen:#{service.generation}) mold", status: "init" + LISTENERS.each(&:close) # Don't appear as listening to incoming requests + service.register_to_master(@control_socket[1]) + readers = [service] + trap(:QUIT) { nuke_listeners!(readers) } + trap(:TERM) { nuke_listeners!(readers) } + trap(:INT) { nuke_listeners!(readers); exit!(0) } + proc_name role: "(gen:#{service.generation}) service", status: "ready" + readers + end + def init_mold_process(mold) proc_name role: "(gen:#{mold.generation}) mold", status: "init" after_mold_fork.call(self, mold) readers = [mold] trap(:QUIT) { nuke_listeners!(readers) } @@ -828,11 +928,11 @@ readers end if Pitchfork.const_defined?(:Waiter) def prep_readers(readers) - Pitchfork::Waiter.prep_readers(readers) + Pitchfork::Info.keep_io(Pitchfork::Waiter.prep_readers(readers)) end else require_relative 'select_waiter' def prep_readers(_readers) Pitchfork::SelectWaiter.new @@ -856,28 +956,29 @@ worker.update_deadline(@timeout) while sock = ready.shift # Pitchfork::Worker#accept_nonblock is not like accept(2) at all, # but that will return false client = sock.accept_nonblock(exception: false) - client = false if client == :wait_readable - if client - case client - when Message::PromoteWorker - if Info.fork_safe? - spawn_mold(worker) - else - logger.error("worker=#{worker.nr} gen=#{worker.generation} is no longer fork safe, can't refork") - end - when Message - worker.update(client) + + case client + when false, :wait_readable + # no message, keep looping + when Message::PromoteWorker + if Info.fork_safe? + spawn_mold(worker) else - request_env = process_client(client, worker, prepare_timeout(worker)) - @after_request_complete&.call(self, worker, request_env) - worker.increment_requests_count + logger.error("worker=#{worker.nr} gen=#{worker.generation} is no longer fork safe, can't refork") end - worker.update_deadline(@timeout) + when Message + worker.update(client) + else + request_env = process_client(client, worker, prepare_timeout(worker)) + worker.increment_requests_count + @after_request_complete&.call(self, worker, request_env) end + + worker.update_deadline(@timeout) end # timeout so we can update .deadline and keep parent from SIGKILL-ing us worker.update_deadline(@timeout) @@ -956,10 +1057,26 @@ Process.exit(1) end rescue => error raise BootFailure, error.message end + when Message::SpawnService + retries = 1 + begin + spawn_service(Service.new(generation: mold.generation), detach: true) + rescue ForkFailure + if retries > 0 + @logger.fatal("mold pid=#{mold.pid} gen=#{mold.generation} Failed to spawn a service. Retrying.") + retries -= 1 + retry + else + @logger.fatal("mold pid=#{mold.pid} gen=#{mold.generation} Failed to spawn a service twice in a row. Corrupted mold process?") + Process.exit(1) + end + rescue => error + raise BootFailure, error.message + end else logger.error("Unexpected mold message #{message.inspect}") end end @@ -1002,18 +1119,17 @@ def proc_name(role: nil, status: nil) @proctitle_role = role if role @proctitle_status = status if status - Process.setproctitle("#{File.basename(START_CTX[0])} #{@proctitle_role} - #{@proctitle_status}") + Process.setproctitle("#{File.basename($PROGRAM_NAME)} #{@proctitle_role} - #{@proctitle_status}") end def bind_listeners! listeners = config[:listeners].dup if listeners.empty? listeners << Pitchfork::Const::DEFAULT_LISTEN @init_listeners << Pitchfork::Const::DEFAULT_LISTEN - START_CTX[:argv] << "-l#{Pitchfork::Const::DEFAULT_LISTEN}" end listeners.each { |addr| listen(addr) } raise ArgumentError, "no listeners" if LISTENERS.empty? end