lib/pitchfork/http_server.rb in pitchfork-0.5.0 vs lib/pitchfork/http_server.rb in pitchfork-0.6.0

- old
+ new

@@ -1,9 +1,11 @@ # -*- encoding: binary -*- require 'pitchfork/pitchfork_http' require 'pitchfork/flock' require 'pitchfork/soft_timeout' +require 'pitchfork/shared_memory' +require 'pitchfork/info' module Pitchfork # This is the process manager of Pitchfork. This manages worker # processes which in turn handle the I/O and application process. # Listener sockets are started in the master process and shared with @@ -42,18 +44,19 @@ def inspect "#<Pitchfork::HttpServer::TimeoutHandler##{object_id}>" end def call(original_thread) # :nodoc: - @server.logger.error("worker=#{@worker.nr} pid=#{@worker.pid} timed out, exiting") - if @callback - @callback.call(@server, @worker, Info.new(original_thread, @rack_env)) + begin + @server.logger.error("worker=#{@worker.nr} pid=#{@worker.pid} timed out, exiting") + if @callback + @callback.call(@server, @worker, Info.new(original_thread, @rack_env)) + end + rescue => error + Pitchfork.log_error(@server.logger, "after_worker_timeout error", error) end - rescue => error - Pitchfork.log_error(@server.logger, "after_worker_timeout error", error) - ensure - exit + @server.worker_exit(@worker) end def finished # :nodoc: @timeout_request.finished end @@ -74,12 +77,12 @@ attr_accessor :app, :timeout, :soft_timeout, :cleanup_timeout, :worker_processes, :after_worker_fork, :after_mold_fork, :listener_opts, :children, :orig_app, :config, :ready_pipe, :default_middleware, :early_hints - attr_writer :after_worker_exit, :after_worker_ready, :after_request_complete, :refork_condition, - :after_worker_timeout + attr_writer :after_worker_exit, :before_worker_exit, :after_worker_ready, :after_request_complete, + :refork_condition, :after_worker_timeout, :after_worker_hard_timeout attr_reader :logger include Pitchfork::SocketHelper include Pitchfork::HttpResponse @@ -161,11 +164,13 @@ config.commit!(self, :skip => [:listeners, :pid]) @orig_app = app # list of signals we care about and trap in master. @queue_sigs = [ :QUIT, :INT, :TERM, :USR2, :TTIN, :TTOU ] - Worker.preallocate_drops(worker_processes) + + Info.workers_count = worker_processes + SharedMemory.preallocate_drops(worker_processes) end # Runs the thing. Returns self so you can run join on it def start(sync = true) Pitchfork.enable_child_subreaper # noop if not supported @@ -310,11 +315,11 @@ reap_all_workers if REFORKING_AVAILABLE && @respawn && @children.molds.empty? logger.info("No mold alive, shutting down") @exit_status = 1 - @sig_queue << :QUIT + @sig_queue << :TERM @respawn = false end case message = @sig_queue.shift when nil @@ -330,14 +335,16 @@ maintain_worker_count restart_outdated_workers if REFORKING_AVAILABLE end master_sleep(sleep_time) if sleep - when :QUIT # graceful shutdown - logger.info "QUIT received, starting graceful shutdown" + when :QUIT, :TERM # graceful shutdown + SharedMemory.shutting_down! + logger.info "#{message} received, starting graceful shutdown" return StopIteration - when :TERM, :INT # immediate shutdown + when :INT # immediate shutdown + SharedMemory.shutting_down! logger.info "#{message} received, starting immediate shutdown" stop(false) return StopIteration when :USR2 # trigger a promotion trigger_refork @@ -357,38 +364,50 @@ old_molds = @children.molds new_mold = @children.update(message) logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} ready") old_molds.each do |old_mold| logger.info("Terminating old mold pid=#{old_mold.pid} gen=#{old_mold.generation}") - old_mold.soft_kill(:QUIT) + old_mold.soft_kill(:TERM) end else logger.error("Unexpected message in sig_queue #{message.inspect}") logger.error(@sig_queue.inspect) end end # Terminates all workers, but does not exit master process def stop(graceful = true) @respawn = false + SharedMemory.shutting_down! wait_for_pending_workers self.listeners = [] limit = Pitchfork.time_now + timeout until @children.workers.empty? || Pitchfork.time_now > limit if graceful - soft_kill_each_child(:QUIT) + soft_kill_each_child(:TERM) else - kill_each_child(:TERM) + kill_each_child(:INT) end if monitor_loop(false) == StopIteration return StopIteration end end kill_each_child(:KILL) @promotion_lock.unlink end + def worker_exit(worker) + if @before_worker_exit + begin + @before_worker_exit.call(self, worker) + rescue => error + Pitchfork.log_error(logger, "before_worker_exit error", error) + end + end + Process.exit + end + def rewindable_input Pitchfork::HttpParser.input_class.method_defined?(:rewind) end def rewindable_input=(bool) @@ -471,14 +490,23 @@ end next else # worker is out of time next_sleep = 0 if worker.mold? - logger.error "mold pid=#{worker.pid} deadline=#{deadline} timed out, killing" + logger.error "mold pid=#{worker.pid} timed out, killing" else - logger.error "worker=#{worker.nr} pid=#{worker.pid} deadline=#{deadline} timed out, killing" + logger.error "worker=#{worker.nr} pid=#{worker.pid} timed out, killing" end + + if @after_worker_hard_timeout + begin + @after_worker_hard_timeout.call(self, worker) + rescue => error + Pitchfork.log_error(@logger, "after_worker_hard_timeout callback", error) + end + end + kill_worker(:KILL, worker.pid) # take no prisoners for hard timeout violations end end next_sleep <= 0 ? 1 : next_sleep @@ -517,10 +545,11 @@ Pitchfork.fork_sibling do worker.pid = Process.pid after_fork_internal worker_loop(worker) + worker_exit(worker) end worker end @@ -575,11 +604,11 @@ end def maintain_worker_count (off = @children.workers_count - worker_processes) == 0 and return off < 0 and return spawn_missing_workers - @children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:QUIT) } + @children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:TERM) } end def restart_outdated_workers # If we're already in the middle of forking a new generation, we just continue return unless @children.mold @@ -592,11 +621,11 @@ if workers_to_restart > 0 outdated_workers = @children.workers.select { |w| !w.exiting? && w.generation < @children.mold.generation } outdated_workers.each do |worker| logger.info("worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} restarting") - worker.soft_kill(:QUIT) + worker.soft_kill(:TERM) end end end # if we get any error, try to write something back to the client @@ -702,11 +731,11 @@ # traps for USR2, and HUP may be set in the after_fork Proc # by the user. def init_worker_process(worker) worker.reset worker.register_to_master(@control_socket[1]) - # we'll re-trap :QUIT later for graceful shutdown iff we accept clients + # we'll re-trap :QUIT and :TERM later for graceful shutdown iff we accept clients exit_sigs = [ :QUIT, :TERM, :INT ] exit_sigs.each { |sig| trap(sig) { exit!(0) } } exit!(0) if (@sig_queue & exit_sigs)[0] (@queue_sigs - exit_sigs).each { |sig| trap(sig, nil) } trap(:CHLD, 'DEFAULT') @@ -720,17 +749,19 @@ @config = nil @listener_opts = @orig_app = nil readers = LISTENERS.dup readers << worker trap(:QUIT) { nuke_listeners!(readers) } + trap(:TERM) { nuke_listeners!(readers) } readers end def init_mold_process(mold) proc_name "(gen: #{mold.generation}) mold" after_mold_fork.call(self, mold) readers = [mold] trap(:QUIT) { nuke_listeners!(readers) } + trap(:TERM) { nuke_listeners!(readers) } readers end if Pitchfork.const_defined?(:Waiter) def prep_readers(readers)