lib/pitchfork/http_server.rb in pitchfork-0.4.1 vs lib/pitchfork/http_server.rb in pitchfork-0.5.0
- old
+ new
@@ -1,22 +1,85 @@
# -*- encoding: binary -*-
require 'pitchfork/pitchfork_http'
require 'pitchfork/flock'
+require 'pitchfork/soft_timeout'
module Pitchfork
# This is the process manager of Pitchfork. This manages worker
# processes which in turn handle the I/O and application process.
# Listener sockets are started in the master process and shared with
# forked worker children.
class HttpServer
+ class TimeoutHandler
+ class Info
+ attr_reader :thread, :rack_env
+
+ def initialize(thread, rack_env)
+ @thread = thread
+ @rack_env = rack_env
+ end
+
+ def copy_thread_variables!
+ current_thread = Thread.current
+ @thread.keys.each do |key|
+ current_thread[key] = @thread[key]
+ end
+ @thread.thread_variables.each do |variable|
+ current_thread.thread_variable_set(variable, @thread.thread_variable_get(variable))
+ end
+ end
+ end
+
+ attr_writer :rack_env, :timeout_request # :nodoc:
+
+ def initialize(server, worker, callback) # :nodoc:
+ @server = server
+ @worker = worker
+ @callback = callback
+ @rack_env = nil
+ @timeout_request = nil
+ end
+
+ def inspect
+ "#<Pitchfork::HttpServer::TimeoutHandler##{object_id}>"
+ end
+
+ def call(original_thread) # :nodoc:
+ @server.logger.error("worker=#{@worker.nr} pid=#{@worker.pid} timed out, exiting")
+ if @callback
+ @callback.call(@server, @worker, Info.new(original_thread, @rack_env))
+ end
+ rescue => error
+ Pitchfork.log_error(@server.logger, "after_worker_timeout error", error)
+ ensure
+ exit
+ end
+
+ def finished # :nodoc:
+ @timeout_request.finished
+ end
+
+ def deadline
+ @timeout_request.deadline
+ end
+
+ def extend_deadline(extra_time)
+ extra_time = Integer(extra_time)
+ @worker.deadline += extra_time
+ @timeout_request.extend_deadline(extra_time)
+ self
+ end
+ end
+
# :stopdoc:
- attr_accessor :app, :timeout, :worker_processes,
+ attr_accessor :app, :timeout, :soft_timeout, :cleanup_timeout, :worker_processes,
:after_worker_fork, :after_mold_fork,
:listener_opts, :children,
:orig_app, :config, :ready_pipe,
:default_middleware, :early_hints
- attr_writer :after_worker_exit, :after_worker_ready, :refork_condition
+ attr_writer :after_worker_exit, :after_worker_ready, :after_request_complete, :refork_condition,
+ :after_worker_timeout
attr_reader :logger
include Pitchfork::SocketHelper
include Pitchfork::HttpResponse
@@ -392,29 +455,34 @@
listener_fds
end
# forcibly terminate all workers that haven't checked in in timeout seconds. The timeout is implemented using an unlinked File
def murder_lazy_workers
- next_sleep = @timeout - 1
now = Pitchfork.time_now(true)
+ next_sleep = @timeout - 1
+
@children.workers.each do |worker|
- tick = worker.tick
- 0 == tick and next # skip workers that haven't processed any clients
- diff = now - tick
- tmp = @timeout - diff
- if tmp >= 0
- next_sleep > tmp and next_sleep = tmp
+ deadline = worker.deadline
+ if 0 == deadline # worker is idle
next
+ elsif deadline > now # worker still has time
+ time_left = deadline - now
+ if time_left < next_sleep
+ next_sleep = time_left
+ end
+ next
+ else # worker is out of time
+ next_sleep = 0
+ if worker.mold?
+ logger.error "mold pid=#{worker.pid} deadline=#{deadline} timed out, killing"
+ else
+ logger.error "worker=#{worker.nr} pid=#{worker.pid} deadline=#{deadline} timed out, killing"
+ end
+ kill_worker(:KILL, worker.pid) # take no prisoners for hard timeout violations
end
- next_sleep = 0
- if worker.mold?
- logger.error "mold pid=#{worker.pid} timeout (#{diff}s > #{@timeout}s), killing"
- else
- logger.error "worker=#{worker.nr} pid=#{worker.pid} timeout (#{diff}s > #{@timeout}s), killing"
- end
- kill_worker(:KILL, worker.pid) # take no prisoners for timeout violations
end
+
next_sleep <= 0 ? 1 : next_sleep
end
def trigger_refork
unless REFORKING_AVAILABLE
@@ -575,13 +643,16 @@
env.delete('HTTP_EXPECT'.freeze)
end
# once a client is accepted, it is processed in its entirety here
# in 3 easy steps: read request, call app, write app response
- def process_client(client)
+ def process_client(client, timeout_handler)
+ env = nil
@request = Pitchfork::HttpParser.new
env = @request.read(client)
+ timeout_handler.rack_env = env
+ env["pitchfork.timeout"] = timeout_handler
if early_hints
env["rack.early_hints"] = lambda do |headers|
e103_response_write(client, headers)
end
@@ -614,10 +685,11 @@
end
rescue => e
handle_error(client, e)
ensure
env["rack.after_reply"].each(&:call) if env
+ timeout_handler.finished
end
def nuke_listeners!(readers)
# only called from the worker, ordering is important here
tmp = readers.dup
@@ -682,11 +754,11 @@
ready = readers.dup
@after_worker_ready.call(self, worker)
while readers[0]
begin
- worker.tick = Pitchfork.time_now(true)
+ worker.update_deadline(@timeout)
while sock = ready.shift
# Pitchfork::Worker#accept_nonblock is not like accept(2) at all,
# but that will return false
client = sock.accept_nonblock(exception: false)
client = false if client == :wait_readable
@@ -695,19 +767,20 @@
when Message::PromoteWorker
spawn_mold(worker.generation)
when Message
worker.update(client)
else
- process_client(client)
+ process_client(client, prepare_timeout(worker))
+ @after_request_complete&.call(self, worker)
worker.increment_requests_count
end
- worker.tick = Pitchfork.time_now(true)
+ worker.update_deadline(@timeout)
end
end
- # timeout so we can .tick and keep parent from SIGKILL-ing us
- worker.tick = Pitchfork.time_now(true)
+ # timeout so we can update .deadline and keep parent from SIGKILL-ing us
+ worker.update_deadline(@timeout)
if @refork_condition && !worker.outdated?
if @refork_condition.met?(worker, logger)
if spawn_mold(worker.generation)
logger.info("Refork condition met, promoting ourselves")
@@ -752,11 +825,11 @@
mold.finish_promotion(@control_socket[1])
while readers[0]
begin
- mold.tick = Pitchfork.time_now(true)
+ mold.update_deadline(@timeout)
while sock = ready.shift
# Pitchfork::Worker#accept_nonblock is not like accept(2) at all,
# but that will return false
message = sock.accept_nonblock(exception: false)
case message
@@ -772,11 +845,11 @@
logger.error("Unexpected mold message #{message.inspect}")
end
end
# timeout so we can .tick and keep parent from SIGKILL-ing us
- mold.tick = Pitchfork.time_now(true)
+ mold.update_deadline(@timeout)
waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved
rescue => e
Pitchfork.log_error(@logger, "mold loop error", e) if readers[0]
end
end
@@ -829,8 +902,14 @@
@init_listeners << Pitchfork::Const::DEFAULT_LISTEN
START_CTX[:argv] << "-l#{Pitchfork::Const::DEFAULT_LISTEN}"
end
listeners.each { |addr| listen(addr) }
raise ArgumentError, "no listeners" if LISTENERS.empty?
+ end
+
+ def prepare_timeout(worker)
+ handler = TimeoutHandler.new(self, worker, @after_worker_timeout)
+ handler.timeout_request = SoftTimeout.request(@soft_timeout, handler)
+ handler
end
end
end