lib/pitchfork/http_server.rb in pitchfork-0.1.2 vs lib/pitchfork/http_server.rb in pitchfork-0.2.0

- old
+ new

@@ -1,21 +1,22 @@ # -*- encoding: binary -*- require 'pitchfork/pitchfork_http' +require 'pitchfork/flock' module Pitchfork # This is the process manager of Pitchfork. This manages worker # processes which in turn handle the I/O and application process. # Listener sockets are started in the master process and shared with # forked worker children. class HttpServer # :stopdoc: attr_accessor :app, :timeout, :worker_processes, - :before_fork, :after_fork, + :after_fork, :after_promotion, :listener_opts, :children, :orig_app, :config, :ready_pipe, :default_middleware, :early_hints - attr_writer :after_worker_exit, :after_worker_ready, :refork_condition, :mold_selector + attr_writer :after_worker_exit, :after_worker_ready, :refork_condition attr_reader :logger include Pitchfork::SocketHelper include Pitchfork::HttpResponse @@ -25,11 +26,10 @@ LISTENERS = [] NOOP = '.' REFORKING_AVAILABLE = Pitchfork::CHILD_SUBREAPER_AVAILABLE || Process.pid == 1 - MAX_SLEEP = 1 # seconds # :startdoc: # This Hash is considered a stable interface and changing its contents # will allow you to switch between different installations of Pitchfork # or even different installations of the same applications without @@ -58,17 +58,19 @@ # Creates a working server on host:port (strange things happen if # port isn't a Number). Use HttpServer::run to start the server and # HttpServer.run.join to join the thread that's processing # incoming requests on the socket. def initialize(app, options = {}) + @exit_status = 0 @app = app @respawn = false @last_check = time_now - @default_middleware = true + @promotion_lock = Flock.new("pitchfork-promotion") + options = options.dup @ready_pipe = options.delete(:ready_pipe) - @init_listeners = options[:listeners] ? options[:listeners].dup : [] + @init_listeners = options[:listeners].dup || [] options[:use_defaults] = true self.config = Pitchfork::Configurator.new(options) self.listener_opts = {} # We use @control_socket differently in the master and worker processes: @@ -102,11 +104,11 @@ :QUIT, :INT, :TERM, :USR2, :TTIN, :TTOU ] Worker.preallocate_drops(worker_processes) end # Runs the thing. Returns self so you can run join on it - def start + def start(sync = true) Pitchfork.enable_child_subreaper # noop if not supported # This socketpair is used to wake us up from select(2) in #join when signals # are trapped. See trap_deferred. # It's also used by newly spawned children to send their soft_signal pipe @@ -118,26 +120,29 @@ # trigger happy and send signals as soon as the pid file exists. # Note that signals don't actually get handled until the #join method @queue_sigs.each { |sig| trap(sig) { @sig_queue << sig; awaken_master } } trap(:CHLD) { awaken_master } - bind_listeners! if REFORKING_AVAILABLE spawn_initial_mold wait_for_pending_workers unless @children.mold raise BootFailure, "The initial mold failed to boot" end else build_app! + bind_listeners! + after_promotion.call(self, Worker.new(nil, pid: $$).promoted!) end - spawn_missing_workers - # We could just return here as we'd register them later in #join. - # However a good part of the test suite assumes #start only return - # once all initial workers are spawned. - wait_for_pending_workers + if sync + spawn_missing_workers + # We could just return here as we'd register them later in #join. + # However a good part of the test suite assumes #start only return + # once all initial workers are spawned. + wait_for_pending_workers + end self end # replaces current listener set with +listeners+. This will @@ -235,14 +240,23 @@ Pitchfork.log_error(@logger, "master loop error", e) end end stop # gracefully shutdown all workers on our way out logger.info "master complete" + @exit_status end def monitor_loop(sleep = true) reap_all_workers + + if REFORKING_AVAILABLE && @respawn && @children.molds.empty? + logger.info("No mold alive, shutting down") + @exit_status = 1 + @sig_queue << :QUIT + @respawn = false + end + case message = @sig_queue.shift when nil # avoid murdering workers after our master process (or the # machine) comes out of suspend/hibernation if (@last_check + @timeout) >= (@last_check = time_now) @@ -251,17 +265,19 @@ sleep_time = @timeout/2.0 + 1 @logger.debug("waiting #{sleep_time}s after suspend/hibernation") end if @respawn maintain_worker_count - automatically_refork_workers if REFORKING_AVAILABLE + restart_outdated_workers if REFORKING_AVAILABLE end master_sleep(sleep_time) if sleep when :QUIT # graceful shutdown + logger.info "QUIT received, starting graceful shutdown" return StopIteration when :TERM, :INT # immediate shutdown + logger.info "#{message} received, starting immediate shutdown" stop(false) return StopIteration when :USR2 # trigger a promotion trigger_refork when :TTIN @@ -288,10 +304,11 @@ end end # Terminates all workers, but does not exit master process def stop(graceful = true) + wait_for_pending_workers self.listeners = [] limit = time_now + timeout until @children.workers.empty? || time_now > limit if graceful soft_kill_each_child(:QUIT) @@ -300,10 +317,11 @@ end sleep(0.1) reap_all_workers end kill_each_child(:KILL) + @promotion_lock.unlink end def rewindable_input Pitchfork::HttpParser.input_class.method_defined?(:rewind) end @@ -331,12 +349,10 @@ private # wait for a signal handler to wake us up and then consume the pipe def master_sleep(sec) - sec = MAX_SLEEP if sec > MAX_SLEEP - @control_socket[0].wait(sec) or return case message = @control_socket[0].recvmsg_nonblock(exception: false) when :wait_readable, NOOP nil else @@ -349,22 +365,22 @@ @control_socket[1].sendmsg_nonblock(NOOP, exception: false) # wakeup master process from select end # reaps all unreaped workers def reap_all_workers - begin + loop do wpid, status = Process.waitpid2(-1, Process::WNOHANG) wpid or return worker = @children.reap(wpid) and worker.close rescue nil if worker @after_worker_exit.call(self, worker, status) else logger.error("reaped unknown subprocess #{status.inspect}") end rescue Errno::ECHILD break - end while true + end end def listener_sockets listener_fds = {} LISTENERS.each do |sock| @@ -372,19 +388,10 @@ listener_fds[sock.fileno] = sock end listener_fds end - def close_sockets_on_exec(sockets) - (3..1024).each do |io| - next if sockets.include?(io) - io = IO.for_fd(io) rescue next - io.autoclose = false - io.close_on_exec = true - end - end - # forcibly terminate all workers that haven't checked in in timeout seconds. The timeout is implemented using an unlinked File def murder_lazy_workers next_sleep = @timeout - 1 now = time_now.to_i @children.workers.each do |worker| @@ -411,21 +418,21 @@ unless REFORKING_AVAILABLE logger.error("This system doesn't support PR_SET_CHILD_SUBREAPER, can't promote a worker") end unless @children.pending_promotion? - @children.refresh - if new_mold = @mold_selector.call(self) + if new_mold = @children.fresh_workers.first @children.promote(new_mold) else - logger.error("The mold select didn't return a candidate") + logger.error("No children at all???") end else end end def after_fork_internal + @promotion_lock.at_fork @control_socket[0].close_write # this is master-only, now @ready_pipe.close if @ready_pipe Pitchfork::Configurator::RACKUP.clear @ready_pipe = @init_listeners = nil @@ -433,13 +440,13 @@ # dying workers can recycle pids OpenSSL::Random.seed(rand.to_s) if defined?(OpenSSL::Random) end def spawn_worker(worker, detach:) - before_fork.call(self, worker) + logger.info("worker=#{worker.nr} gen=#{worker.generation} spawning...") - pid = fork do + pid = Pitchfork.clean_fork do # We double fork so that the new worker is re-attached back # to the master. # This requires either PR_SET_CHILD_SUBREAPER which is exclusive to Linux 3.4 # or the master to be PID 1. if detach && fork @@ -465,14 +472,15 @@ end def spawn_initial_mold mold = Worker.new(nil) mold.create_socketpair! - mold.pid = fork do - after_fork_internal + mold.pid = Pitchfork.clean_fork do + @promotion_lock.try_lock mold.after_fork_in_child build_app! + bind_listeners! mold_loop(mold) end @children.register_mold(mold) end @@ -482,13 +490,15 @@ if @children.nr_alive?(worker_nr) next end worker = Pitchfork::Worker.new(worker_nr) - if !@children.mold || !@children.mold.spawn_worker(worker) - # If there's no mold, or the mold was somehow unreachable - # we fallback to spawning the missing workers ourselves. + if REFORKING_AVAILABLE + unless @children.mold&.spawn_worker(worker) + @logger.error("Failed to send a spawn_woker command") + end + else spawn_worker(worker, detach: false) end # We could directly register workers when we spawn from the # master, like pitchfork does. However it is preferable to # always go through the asynchronous registering process for @@ -502,49 +512,36 @@ def wait_for_pending_workers while @children.pending_workers? master_sleep(0.5) if monitor_loop(false) == StopIteration - break + return StopIteration end end end def maintain_worker_count (off = @children.workers_count - worker_processes) == 0 and return off < 0 and return spawn_missing_workers @children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:QUIT) } end - def automatically_refork_workers + def restart_outdated_workers # If we're already in the middle of forking a new generation, we just continue - if @children.mold - # We don't shutdown any outdated worker if any worker is already being spawned - # or a worker is exiting. Workers are only reforked one by one to minimize the - # impact on capacity. - # In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at - # a time. - return if @children.pending_workers? - return if @children.workers.any?(&:exiting?) + return unless @children.mold - if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation } - logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting") - outdated_worker.soft_kill(:QUIT) - return # That's all folks - end - end + # We don't shutdown any outdated worker if any worker is already being spawned + # or a worker is exiting. Workers are only reforked one by one to minimize the + # impact on capacity. + # In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at + # a time. + return if @children.pending_workers? + return if @children.workers.any?(&:exiting?) - # If all workers are alive and well, we can consider reforking a new generation - if @refork_condition - @children.refresh - if @refork_condition.met?(@children, logger) - logger.info("Refork condition met, scheduling a promotion") - unless @sig_queue.include?(:USR2) - @sig_queue << :USR2 - awaken_master - end - end + if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation } + logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting") + outdated_worker.soft_kill(:QUIT) end end # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up @@ -665,10 +662,11 @@ readers end def init_mold_process(worker) proc_name "mold (gen: #{worker.generation})" + after_promotion.call(self, worker) readers = [worker] trap(:QUIT) { nuke_listeners!(readers) } readers end @@ -700,36 +698,51 @@ # but that will return false client = sock.accept_nonblock(exception: false) client = false if client == :wait_readable if client case client + when Message::PromoteWorker + if @promotion_lock.try_lock + logger.info("Refork asked by master, promoting ourselves") + worker.tick = time_now.to_i + return worker.promoted! + end when Message worker.update(client) else process_client(client) worker.increment_requests_count end worker.tick = time_now.to_i end - return if worker.mold? # We've been promoted we can exit the loop end # timeout so we can .tick and keep parent from SIGKILL-ing us worker.tick = time_now.to_i + if @refork_condition && !worker.outdated? + if @refork_condition.met?(worker, logger) + if @promotion_lock.try_lock + logger.info("Refork condition met, promoting ourselves") + return worker.promote! # We've been promoted we can exit the loop + else + # TODO: if we couldn't acquire the lock, we should backoff the refork_condition to avoid hammering the lock + end + end + end + waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved rescue => e Pitchfork.log_error(@logger, "listen loop error", e) if readers[0] end while readers[0] end def mold_loop(mold) readers = init_mold_process(mold) waiter = prep_readers(readers) - mold.acknowlege_promotion(@control_socket[1]) - + mold.declare_promotion(@control_socket[1]) + @promotion_lock.unlock ready = readers.dup - # TODO: mold ready callback? begin mold.tick = time_now.to_i while sock = ready.shift # Pitchfork::Worker#accept_nonblock is not like accept(2) at all, @@ -737,20 +750,24 @@ message = sock.accept_nonblock(exception: false) case message when false # no message, keep looping when Message::SpawnWorker - spawn_worker(Worker.new(message.nr, generation: mold.generation), detach: true) + begin + spawn_worker(Worker.new(message.nr, generation: mold.generation), detach: true) + rescue => error + raise BootFailure, error.message + end else logger.error("Unexpected mold message #{message.inspect}") end end # timeout so we can .tick and keep parent from SIGKILL-ing us mold.tick = time_now.to_i waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved rescue => e - Pitchfork.log_error(@logger, "listen loop error", e) if readers[0] + Pitchfork.log_error(@logger, "mold loop error", e) if readers[0] end while readers[0] end # delivers a signal to a worker and fails gracefully if the worker # is no longer running.