lib/pitchfork/http_server.rb in pitchfork-0.4.1 vs lib/pitchfork/http_server.rb in pitchfork-0.5.0

- old
+ new

@@ -1,22 +1,85 @@ # -*- encoding: binary -*- require 'pitchfork/pitchfork_http' require 'pitchfork/flock' +require 'pitchfork/soft_timeout' module Pitchfork # This is the process manager of Pitchfork. This manages worker # processes which in turn handle the I/O and application process. # Listener sockets are started in the master process and shared with # forked worker children. class HttpServer + class TimeoutHandler + class Info + attr_reader :thread, :rack_env + + def initialize(thread, rack_env) + @thread = thread + @rack_env = rack_env + end + + def copy_thread_variables! + current_thread = Thread.current + @thread.keys.each do |key| + current_thread[key] = @thread[key] + end + @thread.thread_variables.each do |variable| + current_thread.thread_variable_set(variable, @thread.thread_variable_get(variable)) + end + end + end + + attr_writer :rack_env, :timeout_request # :nodoc: + + def initialize(server, worker, callback) # :nodoc: + @server = server + @worker = worker + @callback = callback + @rack_env = nil + @timeout_request = nil + end + + def inspect + "#<Pitchfork::HttpServer::TimeoutHandler##{object_id}>" + end + + def call(original_thread) # :nodoc: + @server.logger.error("worker=#{@worker.nr} pid=#{@worker.pid} timed out, exiting") + if @callback + @callback.call(@server, @worker, Info.new(original_thread, @rack_env)) + end + rescue => error + Pitchfork.log_error(@server.logger, "after_worker_timeout error", error) + ensure + exit + end + + def finished # :nodoc: + @timeout_request.finished + end + + def deadline + @timeout_request.deadline + end + + def extend_deadline(extra_time) + extra_time = Integer(extra_time) + @worker.deadline += extra_time + @timeout_request.extend_deadline(extra_time) + self + end + end + # :stopdoc: - attr_accessor :app, :timeout, :worker_processes, + attr_accessor :app, :timeout, :soft_timeout, :cleanup_timeout, :worker_processes, :after_worker_fork, :after_mold_fork, :listener_opts, :children, :orig_app, :config, :ready_pipe, :default_middleware, :early_hints - attr_writer :after_worker_exit, :after_worker_ready, :refork_condition + attr_writer :after_worker_exit, :after_worker_ready, :after_request_complete, :refork_condition, + :after_worker_timeout attr_reader :logger include Pitchfork::SocketHelper include Pitchfork::HttpResponse @@ -392,29 +455,34 @@ listener_fds end # forcibly terminate all workers that haven't checked in in timeout seconds. The timeout is implemented using an unlinked File def murder_lazy_workers - next_sleep = @timeout - 1 now = Pitchfork.time_now(true) + next_sleep = @timeout - 1 + @children.workers.each do |worker| - tick = worker.tick - 0 == tick and next # skip workers that haven't processed any clients - diff = now - tick - tmp = @timeout - diff - if tmp >= 0 - next_sleep > tmp and next_sleep = tmp + deadline = worker.deadline + if 0 == deadline # worker is idle next + elsif deadline > now # worker still has time + time_left = deadline - now + if time_left < next_sleep + next_sleep = time_left + end + next + else # worker is out of time + next_sleep = 0 + if worker.mold? + logger.error "mold pid=#{worker.pid} deadline=#{deadline} timed out, killing" + else + logger.error "worker=#{worker.nr} pid=#{worker.pid} deadline=#{deadline} timed out, killing" + end + kill_worker(:KILL, worker.pid) # take no prisoners for hard timeout violations end - next_sleep = 0 - if worker.mold? - logger.error "mold pid=#{worker.pid} timeout (#{diff}s > #{@timeout}s), killing" - else - logger.error "worker=#{worker.nr} pid=#{worker.pid} timeout (#{diff}s > #{@timeout}s), killing" - end - kill_worker(:KILL, worker.pid) # take no prisoners for timeout violations end + next_sleep <= 0 ? 1 : next_sleep end def trigger_refork unless REFORKING_AVAILABLE @@ -575,13 +643,16 @@ env.delete('HTTP_EXPECT'.freeze) end # once a client is accepted, it is processed in its entirety here # in 3 easy steps: read request, call app, write app response - def process_client(client) + def process_client(client, timeout_handler) + env = nil @request = Pitchfork::HttpParser.new env = @request.read(client) + timeout_handler.rack_env = env + env["pitchfork.timeout"] = timeout_handler if early_hints env["rack.early_hints"] = lambda do |headers| e103_response_write(client, headers) end @@ -614,10 +685,11 @@ end rescue => e handle_error(client, e) ensure env["rack.after_reply"].each(&:call) if env + timeout_handler.finished end def nuke_listeners!(readers) # only called from the worker, ordering is important here tmp = readers.dup @@ -682,11 +754,11 @@ ready = readers.dup @after_worker_ready.call(self, worker) while readers[0] begin - worker.tick = Pitchfork.time_now(true) + worker.update_deadline(@timeout) while sock = ready.shift # Pitchfork::Worker#accept_nonblock is not like accept(2) at all, # but that will return false client = sock.accept_nonblock(exception: false) client = false if client == :wait_readable @@ -695,19 +767,20 @@ when Message::PromoteWorker spawn_mold(worker.generation) when Message worker.update(client) else - process_client(client) + process_client(client, prepare_timeout(worker)) + @after_request_complete&.call(self, worker) worker.increment_requests_count end - worker.tick = Pitchfork.time_now(true) + worker.update_deadline(@timeout) end end - # timeout so we can .tick and keep parent from SIGKILL-ing us - worker.tick = Pitchfork.time_now(true) + # timeout so we can update .deadline and keep parent from SIGKILL-ing us + worker.update_deadline(@timeout) if @refork_condition && !worker.outdated? if @refork_condition.met?(worker, logger) if spawn_mold(worker.generation) logger.info("Refork condition met, promoting ourselves") @@ -752,11 +825,11 @@ mold.finish_promotion(@control_socket[1]) while readers[0] begin - mold.tick = Pitchfork.time_now(true) + mold.update_deadline(@timeout) while sock = ready.shift # Pitchfork::Worker#accept_nonblock is not like accept(2) at all, # but that will return false message = sock.accept_nonblock(exception: false) case message @@ -772,11 +845,11 @@ logger.error("Unexpected mold message #{message.inspect}") end end # timeout so we can .tick and keep parent from SIGKILL-ing us - mold.tick = Pitchfork.time_now(true) + mold.update_deadline(@timeout) waiter.get_readers(ready, readers, @timeout * 500) # to milliseconds, but halved rescue => e Pitchfork.log_error(@logger, "mold loop error", e) if readers[0] end end @@ -829,8 +902,14 @@ @init_listeners << Pitchfork::Const::DEFAULT_LISTEN START_CTX[:argv] << "-l#{Pitchfork::Const::DEFAULT_LISTEN}" end listeners.each { |addr| listen(addr) } raise ArgumentError, "no listeners" if LISTENERS.empty? + end + + def prepare_timeout(worker) + handler = TimeoutHandler.new(self, worker, @after_worker_timeout) + handler.timeout_request = SoftTimeout.request(@soft_timeout, handler) + handler end end end