lib/pitchfork/http_server.rb in pitchfork-0.2.0 vs lib/pitchfork/http_server.rb in pitchfork-0.3.0

- old
+ new

@@ -8,11 +8,11 @@ # Listener sockets are started in the master process and shared with # forked worker children. class HttpServer # :stopdoc: attr_accessor :app, :timeout, :worker_processes, - :after_fork, :after_promotion, + :after_worker_fork, :after_mold_fork, :listener_opts, :children, :orig_app, :config, :ready_pipe, :default_middleware, :early_hints attr_writer :after_worker_exit, :after_worker_ready, :refork_condition @@ -25,12 +25,10 @@ # in new projects LISTENERS = [] NOOP = '.' - REFORKING_AVAILABLE = Pitchfork::CHILD_SUBREAPER_AVAILABLE || Process.pid == 1 - # :startdoc: # This Hash is considered a stable interface and changing its contents # will allow you to switch between different installations of Pitchfork # or even different installations of the same applications without # downtime. Keys of this constant Hash are described as follows: @@ -61,11 +59,11 @@ # incoming requests on the socket. def initialize(app, options = {}) @exit_status = 0 @app = app @respawn = false - @last_check = time_now + @last_check = Pitchfork.time_now @promotion_lock = Flock.new("pitchfork-promotion") options = options.dup @ready_pipe = options.delete(:ready_pipe) @init_listeners = options[:listeners].dup || [] @@ -129,11 +127,11 @@ raise BootFailure, "The initial mold failed to boot" end else build_app! bind_listeners! - after_promotion.call(self, Worker.new(nil, pid: $$).promoted!) + after_mold_fork.call(self, Worker.new(nil, pid: $$).promoted!) end if sync spawn_missing_workers # We could just return here as we'd register them later in #join. @@ -176,11 +174,11 @@ Pitchfork::HttpParser::DEFAULTS["rack.logger"] = @logger = obj end # add a given address to the +listeners+ set, idempotently # Allows workers to add a private, per-process listener via the - # after_fork hook. Very useful for debugging and testing. + # after_worker_fork hook. Very useful for debugging and testing. # +:tries+ may be specified as an option for the number of times # to retry, and +:delay+ may be specified as the time in seconds # to delay between retries. # A negative value for +:tries+ indicates the listen will be # retried indefinitely, this is useful when workers belonging to @@ -257,11 +255,11 @@ case message = @sig_queue.shift when nil # avoid murdering workers after our master process (or the # machine) comes out of suspend/hibernation - if (@last_check + @timeout) >= (@last_check = time_now) + if (@last_check + @timeout) >= (@last_check = Pitchfork.time_now) sleep_time = murder_lazy_workers else sleep_time = @timeout/2.0 + 1 @logger.debug("waiting #{sleep_time}s after suspend/hibernation") end @@ -287,38 +285,42 @@ self.worker_processes -= 1 if self.worker_processes > 0 when Message::WorkerSpawned worker = @children.update(message) # TODO: should we send a message to the worker to acknowledge? logger.info "worker=#{worker.nr} pid=#{worker.pid} registered" - when Message::WorkerPromoted + when Message::MoldSpawned + new_mold = @children.update(message) + logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} spawned") + when Message::MoldReady old_molds = @children.molds - new_mold = @children.fetch(message.pid) - logger.info("worker=#{new_mold.nr} pid=#{new_mold.pid} promoted to a mold") - @children.update(message) + new_mold = @children.update(message) + logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} ready") old_molds.each do |old_mold| - logger.info("Terminating old mold pid=#{old_mold.pid}") + logger.info("Terminating old mold pid=#{old_mold.pid} gen=#{old_mold.generation}") old_mold.soft_kill(:QUIT) end else logger.error("Unexpected message in sig_queue #{message.inspect}") logger.error(@sig_queue.inspect) end end # Terminates all workers, but does not exit master process def stop(graceful = true) + @respawn = false wait_for_pending_workers self.listeners = [] - limit = time_now + timeout - until @children.workers.empty? || time_now > limit + limit = Pitchfork.time_now + timeout + until @children.workers.empty? || Pitchfork.time_now > limit if graceful soft_kill_each_child(:QUIT) else kill_each_child(:TERM) end - sleep(0.1) - reap_all_workers + if monitor_loop(false) == StopIteration + return StopIteration + end end kill_each_child(:KILL) @promotion_lock.unlink end @@ -391,11 +393,11 @@ end # forcibly terminate all workers that haven't checked in in timeout seconds. The timeout is implemented using an unlinked File def murder_lazy_workers next_sleep = @timeout - 1 - now = time_now.to_i + now = Pitchfork.time_now(true) @children.workers.each do |worker| tick = worker.tick 0 == tick and next # skip workers that haven't processed any clients diff = now - tick tmp = @timeout - diff @@ -442,47 +444,32 @@ end def spawn_worker(worker, detach:) logger.info("worker=#{worker.nr} gen=#{worker.generation} spawning...") - pid = Pitchfork.clean_fork do - # We double fork so that the new worker is re-attached back - # to the master. - # This requires either PR_SET_CHILD_SUBREAPER which is exclusive to Linux 3.4 - # or the master to be PID 1. - if detach && fork - exit - end + Pitchfork.fork_sibling do worker.pid = Process.pid after_fork_internal worker_loop(worker) - if worker.mold? - mold_loop(worker) - end - exit end - if detach - # If we double forked, we need to wait(2) so that the middle - # process doesn't end up a zombie. - Process.wait(pid) - end - worker end def spawn_initial_mold mold = Worker.new(nil) mold.create_socketpair! mold.pid = Pitchfork.clean_fork do + mold.pid = Process.pid @promotion_lock.try_lock mold.after_fork_in_child build_app! bind_listeners! mold_loop(mold) end + @promotion_lock.at_fork @children.register_mold(mold) end def spawn_missing_workers worker_nr = -1 @@ -492,11 +479,11 @@ end worker = Pitchfork::Worker.new(worker_nr) if REFORKING_AVAILABLE unless @children.mold&.spawn_worker(worker) - @logger.error("Failed to send a spawn_woker command") + @logger.error("Failed to send a spawn_worker command") end else spawn_worker(worker, detach: false) end # We could directly register workers when we spawn from the @@ -527,21 +514,22 @@ def restart_outdated_workers # If we're already in the middle of forking a new generation, we just continue return unless @children.mold - # We don't shutdown any outdated worker if any worker is already being spawned - # or a worker is exiting. Workers are only reforked one by one to minimize the - # impact on capacity. - # In the future we may want to use a dynamic limit, e.g. 10% of workers may be down at - # a time. - return if @children.pending_workers? - return if @children.workers.any?(&:exiting?) + # We don't shutdown any outdated worker if any worker is already being + # spawned or a worker is exiting. Only 10% of workers can be reforked at + # once to minimize the impact on capacity. + max_pending_workers = (worker_processes * 0.1).ceil + workers_to_restart = max_pending_workers - @children.restarting_workers_count - if outdated_worker = @children.workers.find { |w| w.generation < @children.mold.generation } - logger.info("worker=#{outdated_worker.nr} pid=#{outdated_worker.pid} restarting") - outdated_worker.soft_kill(:QUIT) + if workers_to_restart > 0 + outdated_workers = @children.workers.select { |w| !w.exiting? && w.generation < @children.mold.generation } + outdated_workers.each do |worker| + logger.info("worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} restarting") + worker.soft_kill(:QUIT) + end end end # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up @@ -646,28 +634,28 @@ exit_sigs.each { |sig| trap(sig) { exit!(0) } } exit!(0) if (@sig_queue & exit_sigs)[0] (@queue_sigs - exit_sigs).each { |sig| trap(sig, nil) } trap(:CHLD, 'DEFAULT') @sig_queue.clear - proc_name "worker[#{worker.nr}] (gen:#{worker.generation})" + proc_name "(gen:#{worker.generation}) worker[#{worker.nr}]" @children = nil - after_fork.call(self, worker) # can drop perms and create listeners + after_worker_fork.call(self, worker) # can drop perms and create listeners LISTENERS.each { |sock| sock.close_on_exec = true } @config = nil @listener_opts = @orig_app = nil readers = LISTENERS.dup readers << worker trap(:QUIT) { nuke_listeners!(readers) } readers end - def init_mold_process(worker) - proc_name "mold (gen: #{worker.generation})" - after_promotion.call(self, worker) - readers = [worker] + def init_mold_process(mold) + proc_name "(gen: #{mold.generation}) mold" + after_mold_fork.call(self, mold) + readers = [mold] trap(:QUIT) { nuke_listeners!(readers) } readers end if Pitchfork.const_defined?(:Waiter) @@ -689,87 +677,103 @@ waiter = prep_readers(readers) ready = readers.dup @after_worker_ready.call(self, worker) - begin - worker.tick = time_now.to_i - while sock = ready.shift - # Pitchfork::Worker#accept_nonblock is not like accept(2) at all, - # but that will return false - client = sock.accept_nonblock(exception: false) - client = false if client == :wait_readable - if client - case client - when Message::PromoteWorker - if @promotion_lock.try_lock - logger.info("Refork asked by master, promoting ourselves") - worker.tick = time_now.to_i - return worker.promoted! + while readers[0] + begin + worker.tick = Pitchfork.time_now(true) + while sock = ready.shift + # Pitchfork::Worker#accept_nonblock is not like accept(2) at all, + # but that will return false + client = sock.accept_nonblock(exception: false) + client = false if client == :wait_readable + if client + case client + when Message::PromoteWorker + spawn_mold(worker.generation) + when Message + worker.update(client) + else + process_client(client) + worker.increment_requests_count end - when Message - worker.update(client) - else - process_client(client) - worker.increment_requests_count + worker.tick = Pitchfork.time_now(true) end - worker.tick = time_now.to_i end - end - # timeout so we can .tick and keep parent from SIGKILL-ing us - worker.tick = time_now.to_i - if @refork_condition && !worker.outdated? - if @refork_condition.met?(worker, logger) - if @promotion_lock.try_lock - logger.info("Refork condition met, promoting ourselves") - return worker.promote! # We've been promoted we can exit the loop - else - # TODO: if we couldn't acquire the lock, we should backoff the refork_condition to avoid hammering the lock + # timeout so we can .tick and keep parent from SIGKILL-ing us + worker.tick = Pitchfork.time_now(true) + + if @refork_condition && !worker.outdated? + if @refork_condition.met?(worker, logger) + if spawn_mold(worker.generation) + logger.info("Refork condition met, promoting ourselves") + end + @refork_condition.backoff! end end + + waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved + rescue => e + Pitchfork.log_error(@logger, "listen loop error", e) if readers[0] end + end + end - waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved - rescue => e - Pitchfork.log_error(@logger, "listen loop error", e) if readers[0] - end while readers[0] + def spawn_mold(current_generation) + return false unless @promotion_lock.try_lock + + begin + Pitchfork.fork_sibling do + mold = Worker.new(nil, pid: Process.pid, generation: current_generation) + mold.promote! + mold.start_promotion(@control_socket[1]) + mold_loop(mold) + end + true + ensure + @promotion_lock.at_fork # We let the spawned mold own the lock + end end def mold_loop(mold) readers = init_mold_process(mold) waiter = prep_readers(readers) - mold.declare_promotion(@control_socket[1]) @promotion_lock.unlock ready = readers.dup - begin - mold.tick = time_now.to_i - while sock = ready.shift - # Pitchfork::Worker#accept_nonblock is not like accept(2) at all, - # but that will return false - message = sock.accept_nonblock(exception: false) - case message - when false - # no message, keep looping - when Message::SpawnWorker - begin - spawn_worker(Worker.new(message.nr, generation: mold.generation), detach: true) - rescue => error - raise BootFailure, error.message + mold.finish_promotion(@control_socket[1]) + + while readers[0] + begin + mold.tick = Pitchfork.time_now(true) + while sock = ready.shift + # Pitchfork::Worker#accept_nonblock is not like accept(2) at all, + # but that will return false + message = sock.accept_nonblock(exception: false) + case message + when false + # no message, keep looping + when Message::SpawnWorker + begin + spawn_worker(Worker.new(message.nr, generation: mold.generation), detach: true) + rescue => error + raise BootFailure, error.message + end + else + logger.error("Unexpected mold message #{message.inspect}") end - else - logger.error("Unexpected mold message #{message.inspect}") end - end - # timeout so we can .tick and keep parent from SIGKILL-ing us - mold.tick = time_now.to_i - waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved - rescue => e - Pitchfork.log_error(@logger, "mold loop error", e) if readers[0] - end while readers[0] + # timeout so we can .tick and keep parent from SIGKILL-ing us + mold.tick = Pitchfork.time_now(true) + waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved + rescue => e + Pitchfork.log_error(@logger, "mold loop error", e) if readers[0] + end + end end # delivers a signal to a worker and fails gracefully if the worker # is no longer running. def kill_worker(signal, wpid) @@ -817,12 +821,8 @@ @init_listeners << Pitchfork::Const::DEFAULT_LISTEN START_CTX[:argv] << "-l#{Pitchfork::Const::DEFAULT_LISTEN}" end listeners.each { |addr| listen(addr) } raise ArgumentError, "no listeners" if LISTENERS.empty? - end - - def time_now - Process.clock_gettime(Process::CLOCK_MONOTONIC) end end end