lib/pitchfork/http_server.rb in pitchfork-0.5.0 vs lib/pitchfork/http_server.rb in pitchfork-0.6.0
- old
+ new
@@ -1,9 +1,11 @@
# -*- encoding: binary -*-
require 'pitchfork/pitchfork_http'
require 'pitchfork/flock'
require 'pitchfork/soft_timeout'
+require 'pitchfork/shared_memory'
+require 'pitchfork/info'
module Pitchfork
# This is the process manager of Pitchfork. This manages worker
# processes which in turn handle the I/O and application process.
# Listener sockets are started in the master process and shared with
@@ -42,18 +44,19 @@
def inspect
"#<Pitchfork::HttpServer::TimeoutHandler##{object_id}>"
end
def call(original_thread) # :nodoc:
- @server.logger.error("worker=#{@worker.nr} pid=#{@worker.pid} timed out, exiting")
- if @callback
- @callback.call(@server, @worker, Info.new(original_thread, @rack_env))
+ begin
+ @server.logger.error("worker=#{@worker.nr} pid=#{@worker.pid} timed out, exiting")
+ if @callback
+ @callback.call(@server, @worker, Info.new(original_thread, @rack_env))
+ end
+ rescue => error
+ Pitchfork.log_error(@server.logger, "after_worker_timeout error", error)
end
- rescue => error
- Pitchfork.log_error(@server.logger, "after_worker_timeout error", error)
- ensure
- exit
+ @server.worker_exit(@worker)
end
def finished # :nodoc:
@timeout_request.finished
end
@@ -74,12 +77,12 @@
attr_accessor :app, :timeout, :soft_timeout, :cleanup_timeout, :worker_processes,
:after_worker_fork, :after_mold_fork,
:listener_opts, :children,
:orig_app, :config, :ready_pipe,
:default_middleware, :early_hints
- attr_writer :after_worker_exit, :after_worker_ready, :after_request_complete, :refork_condition,
- :after_worker_timeout
+ attr_writer :after_worker_exit, :before_worker_exit, :after_worker_ready, :after_request_complete,
+ :refork_condition, :after_worker_timeout, :after_worker_hard_timeout
attr_reader :logger
include Pitchfork::SocketHelper
include Pitchfork::HttpResponse
@@ -161,11 +164,13 @@
config.commit!(self, :skip => [:listeners, :pid])
@orig_app = app
# list of signals we care about and trap in master.
@queue_sigs = [
:QUIT, :INT, :TERM, :USR2, :TTIN, :TTOU ]
- Worker.preallocate_drops(worker_processes)
+
+ Info.workers_count = worker_processes
+ SharedMemory.preallocate_drops(worker_processes)
end
# Runs the thing. Returns self so you can run join on it
def start(sync = true)
Pitchfork.enable_child_subreaper # noop if not supported
@@ -310,11 +315,11 @@
reap_all_workers
if REFORKING_AVAILABLE && @respawn && @children.molds.empty?
logger.info("No mold alive, shutting down")
@exit_status = 1
- @sig_queue << :QUIT
+ @sig_queue << :TERM
@respawn = false
end
case message = @sig_queue.shift
when nil
@@ -330,14 +335,16 @@
maintain_worker_count
restart_outdated_workers if REFORKING_AVAILABLE
end
master_sleep(sleep_time) if sleep
- when :QUIT # graceful shutdown
- logger.info "QUIT received, starting graceful shutdown"
+ when :QUIT, :TERM # graceful shutdown
+ SharedMemory.shutting_down!
+ logger.info "#{message} received, starting graceful shutdown"
return StopIteration
- when :TERM, :INT # immediate shutdown
+ when :INT # immediate shutdown
+ SharedMemory.shutting_down!
logger.info "#{message} received, starting immediate shutdown"
stop(false)
return StopIteration
when :USR2 # trigger a promotion
trigger_refork
@@ -357,38 +364,50 @@
old_molds = @children.molds
new_mold = @children.update(message)
logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} ready")
old_molds.each do |old_mold|
logger.info("Terminating old mold pid=#{old_mold.pid} gen=#{old_mold.generation}")
- old_mold.soft_kill(:QUIT)
+ old_mold.soft_kill(:TERM)
end
else
logger.error("Unexpected message in sig_queue #{message.inspect}")
logger.error(@sig_queue.inspect)
end
end
# Terminates all workers, but does not exit master process
def stop(graceful = true)
@respawn = false
+ SharedMemory.shutting_down!
wait_for_pending_workers
self.listeners = []
limit = Pitchfork.time_now + timeout
until @children.workers.empty? || Pitchfork.time_now > limit
if graceful
- soft_kill_each_child(:QUIT)
+ soft_kill_each_child(:TERM)
else
- kill_each_child(:TERM)
+ kill_each_child(:INT)
end
if monitor_loop(false) == StopIteration
return StopIteration
end
end
kill_each_child(:KILL)
@promotion_lock.unlink
end
+ def worker_exit(worker)
+ if @before_worker_exit
+ begin
+ @before_worker_exit.call(self, worker)
+ rescue => error
+ Pitchfork.log_error(logger, "before_worker_exit error", error)
+ end
+ end
+ Process.exit
+ end
+
def rewindable_input
Pitchfork::HttpParser.input_class.method_defined?(:rewind)
end
def rewindable_input=(bool)
@@ -471,14 +490,23 @@
end
next
else # worker is out of time
next_sleep = 0
if worker.mold?
- logger.error "mold pid=#{worker.pid} deadline=#{deadline} timed out, killing"
+ logger.error "mold pid=#{worker.pid} timed out, killing"
else
- logger.error "worker=#{worker.nr} pid=#{worker.pid} deadline=#{deadline} timed out, killing"
+ logger.error "worker=#{worker.nr} pid=#{worker.pid} timed out, killing"
end
+
+ if @after_worker_hard_timeout
+ begin
+ @after_worker_hard_timeout.call(self, worker)
+ rescue => error
+ Pitchfork.log_error(@logger, "after_worker_hard_timeout callback", error)
+ end
+ end
+
kill_worker(:KILL, worker.pid) # take no prisoners for hard timeout violations
end
end
next_sleep <= 0 ? 1 : next_sleep
@@ -517,10 +545,11 @@
Pitchfork.fork_sibling do
worker.pid = Process.pid
after_fork_internal
worker_loop(worker)
+ worker_exit(worker)
end
worker
end
@@ -575,11 +604,11 @@
end
def maintain_worker_count
(off = @children.workers_count - worker_processes) == 0 and return
off < 0 and return spawn_missing_workers
- @children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:QUIT) }
+ @children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:TERM) }
end
def restart_outdated_workers
# If we're already in the middle of forking a new generation, we just continue
return unless @children.mold
@@ -592,11 +621,11 @@
if workers_to_restart > 0
outdated_workers = @children.workers.select { |w| !w.exiting? && w.generation < @children.mold.generation }
outdated_workers.each do |worker|
logger.info("worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} restarting")
- worker.soft_kill(:QUIT)
+ worker.soft_kill(:TERM)
end
end
end
# if we get any error, try to write something back to the client
@@ -702,11 +731,11 @@
# traps for USR2, and HUP may be set in the after_fork Proc
# by the user.
def init_worker_process(worker)
worker.reset
worker.register_to_master(@control_socket[1])
- # we'll re-trap :QUIT later for graceful shutdown iff we accept clients
+ # we'll re-trap :QUIT and :TERM later for graceful shutdown iff we accept clients
exit_sigs = [ :QUIT, :TERM, :INT ]
exit_sigs.each { |sig| trap(sig) { exit!(0) } }
exit!(0) if (@sig_queue & exit_sigs)[0]
(@queue_sigs - exit_sigs).each { |sig| trap(sig, nil) }
trap(:CHLD, 'DEFAULT')
@@ -720,17 +749,19 @@
@config = nil
@listener_opts = @orig_app = nil
readers = LISTENERS.dup
readers << worker
trap(:QUIT) { nuke_listeners!(readers) }
+ trap(:TERM) { nuke_listeners!(readers) }
readers
end
def init_mold_process(mold)
proc_name "(gen: #{mold.generation}) mold"
after_mold_fork.call(self, mold)
readers = [mold]
trap(:QUIT) { nuke_listeners!(readers) }
+ trap(:TERM) { nuke_listeners!(readers) }
readers
end
if Pitchfork.const_defined?(:Waiter)
def prep_readers(readers)