lib/pitchfork/http_server.rb in pitchfork-0.6.0 vs lib/pitchfork/http_server.rb in pitchfork-0.7.0
- old
+ new
@@ -89,11 +89,11 @@
# all bound listener sockets
# note: this is public used by raindrops, but not recommended for use
# in new projects
LISTENERS = []
- NOOP = '.'
+ NOOP = '.'.freeze
# :startdoc:
# This Hash is considered a stable interface and changing its contents
# will allow you to switch between different installations of Pitchfork
# or even different installations of the same applications without
@@ -127,18 +127,21 @@
@exit_status = 0
@app = app
@respawn = false
@last_check = Pitchfork.time_now
@promotion_lock = Flock.new("pitchfork-promotion")
+ Info.keep_io(@promotion_lock)
options = options.dup
@ready_pipe = options.delete(:ready_pipe)
@init_listeners = options[:listeners].dup || []
options[:use_defaults] = true
self.config = Pitchfork::Configurator.new(options)
self.listener_opts = {}
+ proc_name role: 'monitor', status: START_CTX[:argv].join(' ')
+
# We use @control_socket differently in the master and worker processes:
#
# * The master process never closes or reinitializes this once
# initialized. Signal handlers in the master process will write to
# it to wake up the master from IO.select in exactly the same manner
@@ -178,10 +181,11 @@
# This socketpair is used to wake us up from select(2) in #join when signals
# are trapped. See trap_deferred.
# It's also used by newly spawned children to send their soft_signal pipe
# to the master when they are spawned.
@control_socket.replace(Pitchfork.socketpair)
+ Info.keep_ios(@control_socket)
@master_pid = $$
# setup signal handlers before writing pid file in case people get
# trigger happy and send signals as soon as the pid file exists.
# Note that signals don't actually get handled until the #join method
@@ -262,10 +266,11 @@
unless TCPServer === io || UNIXServer === io
io.autoclose = false
io = server_cast(io)
end
logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}"
+ Info.keep_io(io)
LISTENERS << io
io
rescue Errno::EADDRINUSE => err
logger.error "adding listener failed addr=#{address} (in use)"
raise err if tries == 0
@@ -285,11 +290,12 @@
# one-at-a-time time and we'll happily drop signals in case somebody
# is signalling us too often.
def join
@respawn = true
- proc_name 'master'
+ proc_name role: 'monitor', status: START_CTX[:argv].join(' ')
+
logger.info "master process ready" # test_exec.rb relies on this message
if @ready_pipe
begin
@ready_pipe.syswrite($$.to_s)
rescue => e
@@ -345,11 +351,15 @@
SharedMemory.shutting_down!
logger.info "#{message} received, starting immediate shutdown"
stop(false)
return StopIteration
when :USR2 # trigger a promotion
- trigger_refork
+ if @respawn
+ trigger_refork
+ else
+ logger.error "Can't trigger a refork as the server is shutting down"
+ end
when :TTIN
@respawn = true
self.worker_processes += 1
when :TTOU
self.worker_processes -= 1 if self.worker_processes > 0
@@ -374,10 +384,11 @@
end
end
# Terminates all workers, but does not exit master process
def stop(graceful = true)
+ proc_name role: 'monitor', status: 'shutting down'
@respawn = false
SharedMemory.shutting_down!
wait_for_pending_workers
self.listeners = []
limit = Pitchfork.time_now + timeout
@@ -394,10 +405,12 @@
kill_each_child(:KILL)
@promotion_lock.unlink
end
def worker_exit(worker)
+ proc_name status: "exiting"
+
if @before_worker_exit
begin
@before_worker_exit.call(self, worker)
rescue => error
Pitchfork.log_error(logger, "before_worker_exit error", error)
@@ -512,20 +525,19 @@
next_sleep <= 0 ? 1 : next_sleep
end
def trigger_refork
unless REFORKING_AVAILABLE
- logger.error("This system doesn't support PR_SET_CHILD_SUBREAPER, can't promote a worker")
+ logger.error("This system doesn't support PR_SET_CHILD_SUBREAPER, can't refork")
end
unless @children.pending_promotion?
if new_mold = @children.fresh_workers.first
@children.promote(new_mold)
else
logger.error("No children at all???")
end
- else
end
end
def after_fork_internal
@promotion_lock.at_fork
@@ -620,12 +632,17 @@
workers_to_restart = max_pending_workers - @children.restarting_workers_count
if workers_to_restart > 0
outdated_workers = @children.workers.select { |w| !w.exiting? && w.generation < @children.mold.generation }
outdated_workers.each do |worker|
- logger.info("worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} restarting")
- worker.soft_kill(:TERM)
+ if worker.soft_kill(:TERM)
+ logger.info("Sent SIGTERM to worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation}")
+ workers_to_restart -= 1
+ else
+ logger.info("Failed to send SIGTERM to worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation}")
+ end
+ break if workers_to_restart <= 0
end
end
end
# if we get any error, try to write something back to the client
@@ -676,10 +693,13 @@
# in 3 easy steps: read request, call app, write app response
def process_client(client, timeout_handler)
env = nil
@request = Pitchfork::HttpParser.new
env = @request.read(client)
+
+ proc_name status: "processing: #{env["PATH_INFO"]}"
+
timeout_handler.rack_env = env
env["pitchfork.timeout"] = timeout_handler
if early_hints
env["rack.early_hints"] = lambda do |headers|
@@ -690,16 +710,16 @@
env["rack.after_reply"] = []
status, headers, body = @app.call(env)
begin
- return if @request.hijacked?
+ return env if @request.hijacked?
if 100 == status.to_i
e100_response_write(client, env)
status, headers, body = @app.call(env)
- return if @request.hijacked?
+ return env if @request.hijacked?
end
@request.headers? or headers = nil
http_response_write(client, status, headers, body, @request)
ensure
body.respond_to?(:close) and body.close
@@ -710,15 +730,18 @@
client.shutdown # in case of fork() in Rack app
rescue Errno::ENOTCONN
end
client.close # flush and uncork socket immediately, no keepalive
end
+ env
rescue => e
handle_error(client, e)
+ env
ensure
env["rack.after_reply"].each(&:call) if env
timeout_handler.finished
+ env
end
def nuke_listeners!(readers)
# only called from the worker, ordering is important here
tmp = readers.dup
@@ -729,20 +752,20 @@
# gets rid of stuff the worker has no business keeping track of
# to free some resources and drops all sig handlers.
# traps for USR2, and HUP may be set in the after_fork Proc
# by the user.
def init_worker_process(worker)
+ proc_name role: "(gen:#{worker.generation}) worker[#{worker.nr}]", status: "init"
worker.reset
worker.register_to_master(@control_socket[1])
# we'll re-trap :QUIT and :TERM later for graceful shutdown iff we accept clients
exit_sigs = [ :QUIT, :TERM, :INT ]
exit_sigs.each { |sig| trap(sig) { exit!(0) } }
exit!(0) if (@sig_queue & exit_sigs)[0]
(@queue_sigs - exit_sigs).each { |sig| trap(sig, nil) }
trap(:CHLD, 'DEFAULT')
@sig_queue.clear
- proc_name "(gen:#{worker.generation}) worker[#{worker.nr}]"
@children = nil
after_worker_fork.call(self, worker) # can drop perms and create listeners
LISTENERS.each { |sock| sock.close_on_exec = true }
@@ -754,11 +777,11 @@
trap(:TERM) { nuke_listeners!(readers) }
readers
end
def init_mold_process(mold)
- proc_name "(gen: #{mold.generation}) mold"
+ proc_name role: "(gen:#{mold.generation}) mold", status: "ready"
after_mold_fork.call(self, mold)
readers = [mold]
trap(:QUIT) { nuke_listeners!(readers) }
trap(:TERM) { nuke_listeners!(readers) }
readers
@@ -783,10 +806,12 @@
waiter = prep_readers(readers)
ready = readers.dup
@after_worker_ready.call(self, worker)
+ proc_name status: "ready"
+
while readers[0]
begin
worker.update_deadline(@timeout)
while sock = ready.shift
# Pitchfork::Worker#accept_nonblock is not like accept(2) at all,
@@ -794,34 +819,39 @@
client = sock.accept_nonblock(exception: false)
client = false if client == :wait_readable
if client
case client
when Message::PromoteWorker
- spawn_mold(worker.generation)
+ if Info.fork_safe?
+ spawn_mold(worker.generation)
+ else
+ logger.error("worker=#{worker.nr} gen=#{worker.generation} is no longer fork safe, can't refork")
+ end
when Message
worker.update(client)
else
- process_client(client, prepare_timeout(worker))
- @after_request_complete&.call(self, worker)
+ request_env = process_client(client, prepare_timeout(worker))
+ @after_request_complete&.call(self, worker, request_env)
worker.increment_requests_count
end
worker.update_deadline(@timeout)
end
end
# timeout so we can update .deadline and keep parent from SIGKILL-ing us
worker.update_deadline(@timeout)
- if @refork_condition && !worker.outdated?
+ if @refork_condition && Info.fork_safe? && !worker.outdated?
if @refork_condition.met?(worker, logger)
if spawn_mold(worker.generation)
logger.info("Refork condition met, promoting ourselves")
end
@refork_condition.backoff!
end
end
+ proc_name status: "waiting"
waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
rescue => e
Pitchfork.log_error(@logger, "listen loop error", e) if readers[0]
end
end
@@ -909,22 +939,26 @@
end
def build_app!
return unless app.respond_to?(:arity)
+ proc_name status: "booting"
+
self.app = case app.arity
when 0
app.call
when 2
app.call(nil, self)
when 1
app # already a rack app
end
end
- def proc_name(tag)
- $0 = ([ File.basename(START_CTX[0]), tag
- ]).concat(START_CTX[:argv]).join(' ')
+ def proc_name(role: nil, status: nil)
+ @proctitle_role = role if role
+ @proctitle_status = status if status
+
+ Process.setproctitle("#{File.basename(START_CTX[0])} #{@proctitle_role} - #{@proctitle_status}")
end
def bind_listeners!
listeners = config[:listeners].dup
if listeners.empty?