lib/pitchfork/http_server.rb in pitchfork-0.6.0 vs lib/pitchfork/http_server.rb in pitchfork-0.7.0

- old
+ new

@@ -89,11 +89,11 @@ # all bound listener sockets # note: this is public used by raindrops, but not recommended for use # in new projects LISTENERS = [] - NOOP = '.' + NOOP = '.'.freeze # :startdoc: # This Hash is considered a stable interface and changing its contents # will allow you to switch between different installations of Pitchfork # or even different installations of the same applications without @@ -127,18 +127,21 @@ @exit_status = 0 @app = app @respawn = false @last_check = Pitchfork.time_now @promotion_lock = Flock.new("pitchfork-promotion") + Info.keep_io(@promotion_lock) options = options.dup @ready_pipe = options.delete(:ready_pipe) @init_listeners = options[:listeners].dup || [] options[:use_defaults] = true self.config = Pitchfork::Configurator.new(options) self.listener_opts = {} + proc_name role: 'monitor', status: START_CTX[:argv].join(' ') + # We use @control_socket differently in the master and worker processes: # # * The master process never closes or reinitializes this once # initialized. Signal handlers in the master process will write to # it to wake up the master from IO.select in exactly the same manner @@ -178,10 +181,11 @@ # This socketpair is used to wake us up from select(2) in #join when signals # are trapped. See trap_deferred. # It's also used by newly spawned children to send their soft_signal pipe # to the master when they are spawned. @control_socket.replace(Pitchfork.socketpair) + Info.keep_ios(@control_socket) @master_pid = $$ # setup signal handlers before writing pid file in case people get # trigger happy and send signals as soon as the pid file exists. # Note that signals don't actually get handled until the #join method @@ -262,10 +266,11 @@ unless TCPServer === io || UNIXServer === io io.autoclose = false io = server_cast(io) end logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}" + Info.keep_io(io) LISTENERS << io io rescue Errno::EADDRINUSE => err logger.error "adding listener failed addr=#{address} (in use)" raise err if tries == 0 @@ -285,11 +290,12 @@ # one-at-a-time time and we'll happily drop signals in case somebody # is signalling us too often. def join @respawn = true - proc_name 'master' + proc_name role: 'monitor', status: START_CTX[:argv].join(' ') + logger.info "master process ready" # test_exec.rb relies on this message if @ready_pipe begin @ready_pipe.syswrite($$.to_s) rescue => e @@ -345,11 +351,15 @@ SharedMemory.shutting_down! logger.info "#{message} received, starting immediate shutdown" stop(false) return StopIteration when :USR2 # trigger a promotion - trigger_refork + if @respawn + trigger_refork + else + logger.error "Can't trigger a refork as the server is shutting down" + end when :TTIN @respawn = true self.worker_processes += 1 when :TTOU self.worker_processes -= 1 if self.worker_processes > 0 @@ -374,10 +384,11 @@ end end # Terminates all workers, but does not exit master process def stop(graceful = true) + proc_name role: 'monitor', status: 'shutting down' @respawn = false SharedMemory.shutting_down! wait_for_pending_workers self.listeners = [] limit = Pitchfork.time_now + timeout @@ -394,10 +405,12 @@ kill_each_child(:KILL) @promotion_lock.unlink end def worker_exit(worker) + proc_name status: "exiting" + if @before_worker_exit begin @before_worker_exit.call(self, worker) rescue => error Pitchfork.log_error(logger, "before_worker_exit error", error) @@ -512,20 +525,19 @@ next_sleep <= 0 ? 1 : next_sleep end def trigger_refork unless REFORKING_AVAILABLE - logger.error("This system doesn't support PR_SET_CHILD_SUBREAPER, can't promote a worker") + logger.error("This system doesn't support PR_SET_CHILD_SUBREAPER, can't refork") end unless @children.pending_promotion? if new_mold = @children.fresh_workers.first @children.promote(new_mold) else logger.error("No children at all???") end - else end end def after_fork_internal @promotion_lock.at_fork @@ -620,12 +632,17 @@ workers_to_restart = max_pending_workers - @children.restarting_workers_count if workers_to_restart > 0 outdated_workers = @children.workers.select { |w| !w.exiting? && w.generation < @children.mold.generation } outdated_workers.each do |worker| - logger.info("worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} restarting") - worker.soft_kill(:TERM) + if worker.soft_kill(:TERM) + logger.info("Sent SIGTERM to worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation}") + workers_to_restart -= 1 + else + logger.info("Failed to send SIGTERM to worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation}") + end + break if workers_to_restart <= 0 end end end # if we get any error, try to write something back to the client @@ -676,10 +693,13 @@ # in 3 easy steps: read request, call app, write app response def process_client(client, timeout_handler) env = nil @request = Pitchfork::HttpParser.new env = @request.read(client) + + proc_name status: "processing: #{env["PATH_INFO"]}" + timeout_handler.rack_env = env env["pitchfork.timeout"] = timeout_handler if early_hints env["rack.early_hints"] = lambda do |headers| @@ -690,16 +710,16 @@ env["rack.after_reply"] = [] status, headers, body = @app.call(env) begin - return if @request.hijacked? + return env if @request.hijacked? if 100 == status.to_i e100_response_write(client, env) status, headers, body = @app.call(env) - return if @request.hijacked? + return env if @request.hijacked? end @request.headers? or headers = nil http_response_write(client, status, headers, body, @request) ensure body.respond_to?(:close) and body.close @@ -710,15 +730,18 @@ client.shutdown # in case of fork() in Rack app rescue Errno::ENOTCONN end client.close # flush and uncork socket immediately, no keepalive end + env rescue => e handle_error(client, e) + env ensure env["rack.after_reply"].each(&:call) if env timeout_handler.finished + env end def nuke_listeners!(readers) # only called from the worker, ordering is important here tmp = readers.dup @@ -729,20 +752,20 @@ # gets rid of stuff the worker has no business keeping track of # to free some resources and drops all sig handlers. # traps for USR2, and HUP may be set in the after_fork Proc # by the user. def init_worker_process(worker) + proc_name role: "(gen:#{worker.generation}) worker[#{worker.nr}]", status: "init" worker.reset worker.register_to_master(@control_socket[1]) # we'll re-trap :QUIT and :TERM later for graceful shutdown iff we accept clients exit_sigs = [ :QUIT, :TERM, :INT ] exit_sigs.each { |sig| trap(sig) { exit!(0) } } exit!(0) if (@sig_queue & exit_sigs)[0] (@queue_sigs - exit_sigs).each { |sig| trap(sig, nil) } trap(:CHLD, 'DEFAULT') @sig_queue.clear - proc_name "(gen:#{worker.generation}) worker[#{worker.nr}]" @children = nil after_worker_fork.call(self, worker) # can drop perms and create listeners LISTENERS.each { |sock| sock.close_on_exec = true } @@ -754,11 +777,11 @@ trap(:TERM) { nuke_listeners!(readers) } readers end def init_mold_process(mold) - proc_name "(gen: #{mold.generation}) mold" + proc_name role: "(gen:#{mold.generation}) mold", status: "ready" after_mold_fork.call(self, mold) readers = [mold] trap(:QUIT) { nuke_listeners!(readers) } trap(:TERM) { nuke_listeners!(readers) } readers @@ -783,10 +806,12 @@ waiter = prep_readers(readers) ready = readers.dup @after_worker_ready.call(self, worker) + proc_name status: "ready" + while readers[0] begin worker.update_deadline(@timeout) while sock = ready.shift # Pitchfork::Worker#accept_nonblock is not like accept(2) at all, @@ -794,34 +819,39 @@ client = sock.accept_nonblock(exception: false) client = false if client == :wait_readable if client case client when Message::PromoteWorker - spawn_mold(worker.generation) + if Info.fork_safe? + spawn_mold(worker.generation) + else + logger.error("worker=#{worker.nr} gen=#{worker.generation} is no longer fork safe, can't refork") + end when Message worker.update(client) else - process_client(client, prepare_timeout(worker)) - @after_request_complete&.call(self, worker) + request_env = process_client(client, prepare_timeout(worker)) + @after_request_complete&.call(self, worker, request_env) worker.increment_requests_count end worker.update_deadline(@timeout) end end # timeout so we can update .deadline and keep parent from SIGKILL-ing us worker.update_deadline(@timeout) - if @refork_condition && !worker.outdated? + if @refork_condition && Info.fork_safe? && !worker.outdated? if @refork_condition.met?(worker, logger) if spawn_mold(worker.generation) logger.info("Refork condition met, promoting ourselves") end @refork_condition.backoff! end end + proc_name status: "waiting" waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved rescue => e Pitchfork.log_error(@logger, "listen loop error", e) if readers[0] end end @@ -909,22 +939,26 @@ end def build_app! return unless app.respond_to?(:arity) + proc_name status: "booting" + self.app = case app.arity when 0 app.call when 2 app.call(nil, self) when 1 app # already a rack app end end - def proc_name(tag) - $0 = ([ File.basename(START_CTX[0]), tag - ]).concat(START_CTX[:argv]).join(' ') + def proc_name(role: nil, status: nil) + @proctitle_role = role if role + @proctitle_status = status if status + + Process.setproctitle("#{File.basename(START_CTX[0])} #{@proctitle_role} - #{@proctitle_status}") end def bind_listeners! listeners = config[:listeners].dup if listeners.empty?