lib/puma/server.rb in piesync-puma-3.12.6.1 vs lib/puma/server.rb in piesync-puma-5.4.0.1

- old
+ new

@@ -4,144 +4,179 @@ require 'puma/thread_pool' require 'puma/const' require 'puma/events' require 'puma/null_io' -require 'puma/compat' require 'puma/reactor' require 'puma/client' require 'puma/binder' -require 'puma/delegation' -require 'puma/accept_nonblock' require 'puma/util' +require 'puma/io_buffer' +require 'puma/request' -require 'puma/puma_http11' - -unless Puma.const_defined? "IOBuffer" - require 'puma/io_buffer' -end - require 'socket' +require 'io/wait' +require 'forwardable' module Puma # The HTTP Server itself. Serves out a single Rack app. # # This class is used by the `Puma::Single` and `Puma::Cluster` classes # to generate one or more `Puma::Server` instances capable of handling requests. - # Each Puma process will contain one `Puma::Server` instacne. + # Each Puma process will contain one `Puma::Server` instance. # # The `Puma::Server` instance pulls requests from the socket, adds them to a # `Puma::Reactor` where they get eventually passed to a `Puma::ThreadPool`. # # Each `Puma::Server` will have one reactor and one thread pool. class Server include Puma::Const - extend Puma::Delegation + include Request + extend Forwardable attr_reader :thread attr_reader :events + attr_reader :min_threads, :max_threads # for #stats + attr_reader :requests_count # @version 5.0.0 + + # @todo the following may be deprecated in the future + attr_reader :auto_trim_time, :early_hints, :first_data_timeout, + :leak_stack_on_error, + :persistent_timeout, :reaping_time + + # @deprecated v6.0.0 + attr_writer :auto_trim_time, :early_hints, :first_data_timeout, + :leak_stack_on_error, :min_threads, :max_threads, + :persistent_timeout, :reaping_time + attr_accessor :app + attr_accessor :binder - attr_accessor :min_threads - attr_accessor :max_threads - attr_accessor :persistent_timeout - attr_accessor :auto_trim_time - attr_accessor :reaping_time - attr_accessor :first_data_timeout + def_delegators :@binder, :add_tcp_listener, :add_ssl_listener, + :add_unix_listener, :connected_ports + ThreadLocalKey = :puma_server + # Create a server for the rack app +app+. # # +events+ is an object which will be called when certain error events occur # to be handled. See Puma::Events for the list of current methods to implement. # # Server#run returns a thread that you can join on to wait for the server # to do its work. # + # @note Several instance variables exist so they are available for testing, + # and have default values set via +fetch+. Normally the values are set via + # `::Puma::Configuration.puma_default_options`. + # def initialize(app, events=Events.stdio, options={}) @app = app @events = events - @check, @notify = Puma::Util.pipe - + @check, @notify = nil @status = :stop - @min_threads = 0 - @max_threads = 16 @auto_trim_time = 30 @reaping_time = 1 @thread = nil @thread_pool = nil - @early_hints = nil - @persistent_timeout = options.fetch(:persistent_timeout, PERSISTENT_TIMEOUT) - @first_data_timeout = options.fetch(:first_data_timeout, FIRST_DATA_TIMEOUT) + @options = options - @binder = Binder.new(events) - @own_binder = true + @early_hints = options.fetch :early_hints, nil + @first_data_timeout = options.fetch :first_data_timeout, FIRST_DATA_TIMEOUT + @min_threads = options.fetch :min_threads, 0 + @max_threads = options.fetch :max_threads , (Puma.mri? ? 5 : 16) + @persistent_timeout = options.fetch :persistent_timeout, PERSISTENT_TIMEOUT + @queue_requests = options.fetch :queue_requests, true + @max_fast_inline = options.fetch :max_fast_inline, MAX_FAST_INLINE + @io_selector_backend = options.fetch :io_selector_backend, :auto - @leak_stack_on_error = true + temp = !!(@options[:environment] =~ /\A(development|test)\z/) + @leak_stack_on_error = @options[:environment] ? temp : true - @options = options - @queue_requests = options[:queue_requests].nil? ? true : options[:queue_requests] + @binder = Binder.new(events) ENV['RACK_ENV'] ||= "development" @mode = :http @precheck_closing = true + + @requests_count = 0 end - attr_accessor :binder, :leak_stack_on_error, :early_hints - - forward :add_tcp_listener, :@binder - forward :add_ssl_listener, :@binder - forward :add_unix_listener, :@binder - forward :connected_port, :@binder - def inherit_binder(bind) @binder = bind - @own_binder = false end - def tcp_mode! - @mode = :tcp + class << self + # @!attribute [r] current + def current + Thread.current[ThreadLocalKey] + end + + # :nodoc: + # @version 5.0.0 + def tcp_cork_supported? + Socket.const_defined?(:TCP_CORK) && Socket.const_defined?(:IPPROTO_TCP) + end + + # :nodoc: + # @version 5.0.0 + def closed_socket_supported? + Socket.const_defined?(:TCP_INFO) && Socket.const_defined?(:IPPROTO_TCP) + end + private :tcp_cork_supported? + private :closed_socket_supported? end # On Linux, use TCP_CORK to better control how the TCP stack # packetizes our stream. This improves both latency and throughput. + # socket parameter may be an MiniSSL::Socket, so use to_io # - if RUBY_PLATFORM =~ /linux/ - UNPACK_TCP_STATE_FROM_TCP_INFO = "C".freeze - + if tcp_cork_supported? # 6 == Socket::IPPROTO_TCP # 3 == TCP_CORK # 1/0 == turn on/off def cork_socket(socket) + skt = socket.to_io begin - socket.setsockopt(6, 3, 1) if socket.kind_of? TCPSocket + skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 1) if skt.kind_of? TCPSocket rescue IOError, SystemCallError Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue end end def uncork_socket(socket) + skt = socket.to_io begin - socket.setsockopt(6, 3, 0) if socket.kind_of? TCPSocket + skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 0) if skt.kind_of? TCPSocket rescue IOError, SystemCallError Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue end end + else + def cork_socket(socket) + end + def uncork_socket(socket) + end + end + + if closed_socket_supported? + UNPACK_TCP_STATE_FROM_TCP_INFO = "C".freeze + def closed_socket?(socket) - return false unless socket.kind_of? TCPSocket - return false unless @precheck_closing + skt = socket.to_io + return false unless skt.kind_of?(TCPSocket) && @precheck_closing begin - tcp_info = socket.getsockopt(Socket::SOL_TCP, Socket::TCP_INFO) + tcp_info = skt.getsockopt(Socket::IPPROTO_TCP, Socket::TCP_INFO) rescue IOError, SystemCallError Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue @precheck_closing = false false else @@ -149,25 +184,21 @@ # TIME_WAIT: 6, CLOSE: 7, CLOSE_WAIT: 8, LAST_ACK: 9, CLOSING: 11 (state >= 6 && state <= 9) || state == 11 end end else - def cork_socket(socket) - end - - def uncork_socket(socket) - end - def closed_socket?(socket) false end end + # @!attribute [r] backlog def backlog @thread_pool and @thread_pool.backlog end + # @!attribute [r] running def running @thread_pool and @thread_pool.spawned end @@ -176,202 +207,115 @@ # # For example if the number is 5 then it means # there are 5 threads sitting idle ready to take # a request. If one request comes in, then the # value would be 4 until it finishes processing. + # @!attribute [r] pool_capacity def pool_capacity @thread_pool and @thread_pool.pool_capacity end - # Lopez Mode == raw tcp apps - - def run_lopez_mode(background=true) - @thread_pool = ThreadPool.new(@min_threads, - @max_threads, - Hash) do |client, tl| - - io = client.to_io - addr = io.peeraddr.last - - if addr.empty? - # Set unix socket addrs to localhost - addr = "127.0.0.1:0" - else - addr = "#{addr}:#{io.peeraddr[1]}" - end - - env = { 'thread' => tl, REMOTE_ADDR => addr } - - begin - @app.call env, client.to_io - rescue Object => e - STDERR.puts "! Detected exception at toplevel: #{e.message} (#{e.class})" - STDERR.puts e.backtrace - end - - client.close unless env['detach'] - end - - @events.fire :state, :running - - if background - @thread = Thread.new { handle_servers_lopez_mode } - return @thread - else - handle_servers_lopez_mode - end - end - - def handle_servers_lopez_mode - begin - check = @check - sockets = [check] + @binder.ios - pool = @thread_pool - - while @status == :run - begin - ios = IO.select sockets - ios.first.each do |sock| - if sock == check - break if handle_check - else - begin - if io = sock.accept_nonblock - client = Client.new io, nil - pool << client - end - rescue SystemCallError - # nothing - rescue Errno::ECONNABORTED - # client closed the socket even before accept - begin - io.close - rescue - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - end - end - end - end - rescue Object => e - @events.unknown_error self, e, "Listen loop" - end - end - - @events.fire :state, @status - - graceful_shutdown if @status == :stop || @status == :restart - - rescue Exception => e - STDERR.puts "Exception handling servers: #{e.message} (#{e.class})" - STDERR.puts e.backtrace - ensure - begin - @check.close - rescue - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - end - - @notify.close - - if @status != :restart and @own_binder - @binder.close - end - end - - @events.fire :state, :done - end # Runs the server. # # If +background+ is true (the default) then a thread is spun # up in the background to handle requests. Otherwise requests # are handled synchronously. # - def run(background=true) + def run(background=true, thread_name: 'server') BasicSocket.do_not_reverse_lookup = true @events.fire :state, :booting @status = :run - if @mode == :tcp - return run_lopez_mode(background) - end + @thread_pool = ThreadPool.new( + thread_name, + @min_threads, + @max_threads, + ::Puma::IOBuffer, + &method(:process_client) + ) - queue_requests = @queue_requests - - @thread_pool = ThreadPool.new(@min_threads, - @max_threads, - IOBuffer) do |client, buffer| - - # Advertise this server into the thread - Thread.current[ThreadLocalKey] = self - - process_now = false - - begin - if queue_requests - process_now = client.eagerly_finish - else - client.finish - process_now = true - end - rescue MiniSSL::SSLError => e - ssl_socket = client.io - addr = ssl_socket.peeraddr.last - cert = ssl_socket.peercert - - client.close - - @events.ssl_error self, addr, cert, e - rescue HttpParserError => e - client.write_400 - client.close - - @events.parse_error self, client.env, e - rescue ConnectionError, EOFError - client.close - else - if process_now - process_client client, buffer - else - client.set_timeout @first_data_timeout - @reactor.add client - end - end - end - + @thread_pool.out_of_band_hook = @options[:out_of_band] @thread_pool.clean_thread_locals = @options[:clean_thread_locals] - if queue_requests - @reactor = Reactor.new self, @thread_pool - @reactor.run_in_thread + if @queue_requests + @reactor = Reactor.new(@io_selector_backend, &method(:reactor_wakeup)) + @reactor.run end if @reaping_time @thread_pool.auto_reap!(@reaping_time) end if @auto_trim_time @thread_pool.auto_trim!(@auto_trim_time) end + @check, @notify = Puma::Util.pipe unless @notify + @events.fire :state, :running if background - @thread = Thread.new { handle_servers } + @thread = Thread.new do + Puma.set_thread_name thread_name + handle_servers + end return @thread else handle_servers end end + # This method is called from the Reactor thread when a queued Client receives data, + # times out, or when the Reactor is shutting down. + # + # It is responsible for ensuring that a request has been completely received + # before it starts to be processed by the ThreadPool. This may be known as read buffering. + # If read buffering is not done, and no other read buffering is performed (such as by an application server + # such as nginx) then the application would be subject to a slow client attack. + # + # For a graphical representation of how the request buffer works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline). + # + # The method checks to see if it has the full header and body with + # the `Puma::Client#try_to_finish` method. If the full request has been sent, + # then the request is passed to the ThreadPool (`@thread_pool << client`) + # so that a "worker thread" can pick up the request and begin to execute application logic. + # The Client is then removed from the reactor (return `true`). + # + # If a client object times out, a 408 response is written, its connection is closed, + # and the object is removed from the reactor (return `true`). + # + # If the Reactor is shutting down, all Clients are either timed out or passed to the + # ThreadPool, depending on their current state (#can_close?). + # + # Otherwise, if the full request is not ready then the client will remain in the reactor + # (return `false`). When the client sends more data to the socket the `Puma::Client` object + # will wake up and again be checked to see if it's ready to be passed to the thread pool. + def reactor_wakeup(client) + shutdown = !@queue_requests + if client.try_to_finish || (shutdown && !client.can_close?) + @thread_pool << client + elsif shutdown || client.timeout == 0 + client.timeout! + else + client.set_timeout(@first_data_timeout) + false + end + rescue StandardError => e + client_error(e, client) + client.close + true + end + def handle_servers begin check = @check sockets = [check] + @binder.ios pool = @thread_pool queue_requests = @queue_requests + drain = @options[:drain_on_shutdown] ? 0 : nil remote_addr_value = nil remote_addr_header = nil case @options[:remote_address] @@ -379,63 +323,62 @@ remote_addr_value = @options[:remote_address_value] when :header remote_addr_header = @options[:remote_address_header] end - while @status == :run + while @status == :run || (drain && shutting_down?) begin - ios = IO.select sockets + ios = IO.select sockets, nil, nil, (shutting_down? ? 0 : nil) + break unless ios ios.first.each do |sock| if sock == check break if handle_check else - begin - if io = sock.accept_nonblock - client = Client.new io, @binder.env(sock) - if remote_addr_value - client.peerip = remote_addr_value - elsif remote_addr_header - client.remote_addr_header = remote_addr_header - end + pool.wait_until_not_full + pool.wait_for_less_busy_worker(@options[:wait_for_less_busy_worker]) - pool << client - pool.wait_until_not_full - end - rescue SystemCallError - # nothing - rescue Errno::ECONNABORTED - # client closed the socket even before accept - begin - io.close - rescue - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - end + io = begin + sock.accept_nonblock + rescue IO::WaitReadable + next end + drain += 1 if shutting_down? + client = Client.new io, @binder.env(sock) + client.listener = sock + if remote_addr_value + client.peerip = remote_addr_value + elsif remote_addr_header + client.remote_addr_header = remote_addr_header + end + pool << client end end rescue Object => e - @events.unknown_error self, e, "Listen loop" + @events.unknown_error e, nil, "Listen loop" end end + @events.debug "Drained #{drain} additional connections." if drain @events.fire :state, @status - graceful_shutdown if @status == :stop || @status == :restart if queue_requests - @reactor.clear! + @queue_requests = false @reactor.shutdown end + graceful_shutdown if @status == :stop || @status == :restart rescue Exception => e - STDERR.puts "Exception handling servers: #{e.message} (#{e.class})" - STDERR.puts e.backtrace + @events.unknown_error e, nil, "Exception handling servers" ensure - @check.close - @notify.close - - if @status != :restart and @own_binder - @binder.close + begin + @check.close unless @check.closed? + rescue Errno::EBADF, RuntimeError + # RuntimeError is Ruby 2.2 issue, can't modify frozen IOError + # Errno::EBADF is infrequently raised end + @notify.close + @notify = nil + @check = nil end @events.fire :state, :done end @@ -453,502 +396,150 @@ when RESTART_COMMAND @status = :restart return true end - return false + false end - # Given a connection on +client+, handle the incoming requests. + # Given a connection on +client+, handle the incoming requests, + # or queue the connection in the Reactor if no request is available. # - # This method support HTTP Keep-Alive so it may, depending on if the client + # This method is called from a ThreadPool worker thread. + # + # This method supports HTTP Keep-Alive so it may, depending on if the client # indicates that it supports keep alive, wait for another request before # returning. # + # Return true if one or more requests were processed. def process_client(client, buffer) + # Advertise this server into the thread + Thread.current[ThreadLocalKey] = self + + clean_thread_locals = @options[:clean_thread_locals] + close_socket = true + + requests = 0 + begin + if @queue_requests && + !client.eagerly_finish - clean_thread_locals = @options[:clean_thread_locals] - close_socket = true + client.set_timeout(@first_data_timeout) + if @reactor.add client + close_socket = false + return false + end + end - requests = 0 + with_force_shutdown(client) do + client.finish(@first_data_timeout) + end while true - case handle_request(client, buffer) + @requests_count += 1 + case handle_request(client, buffer, requests + 1) when false - return + break when :async close_socket = false - return + break when true - return unless @queue_requests buffer.reset ThreadPool.clean_thread_locals if clean_thread_locals requests += 1 - check_for_more_data = @status == :run + # As an optimization, try to read the next request from the + # socket for a short time before returning to the reactor. + fast_check = @status == :run - if requests >= MAX_FAST_INLINE - # This will mean that reset will only try to use the data it already - # has buffered and won't try to read more data. What this means is that - # every client, independent of their request speed, gets treated like a slow - # one once every MAX_FAST_INLINE requests. - check_for_more_data = false + # Always pass the client back to the reactor after a reasonable + # number of inline requests if there are other requests pending. + fast_check = false if requests >= @max_fast_inline && + @thread_pool.backlog > 0 + + next_request_ready = with_force_shutdown(client) do + client.reset(fast_check) end - unless client.reset(check_for_more_data) - close_socket = false + unless next_request_ready + break unless @queue_requests client.set_timeout @persistent_timeout - @reactor.add client - return + if @reactor.add client + close_socket = false + break + end end end end - - # The client disconnected while we were reading data - rescue ConnectionError - # Swallow them. The ensure tries to close +client+ down - - # SSL handshake error - rescue MiniSSL::SSLError => e - lowlevel_error(e, client.env) - - ssl_socket = client.io - addr = ssl_socket.peeraddr.last - cert = ssl_socket.peercert - - close_socket = true - - @events.ssl_error self, addr, cert, e - - # The client doesn't know HTTP well - rescue HttpParserError => e - lowlevel_error(e, client.env) - - client.write_400 - - @events.parse_error self, client.env, e - - # Server error + true rescue StandardError => e - lowlevel_error(e, client.env) - - client.write_500 - - @events.unknown_error self, e, "Read" - + client_error(e, client) + # The ensure tries to close +client+ down + requests > 0 ensure buffer.reset begin client.close if close_socket rescue IOError, SystemCallError Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue # Already closed rescue StandardError => e - @events.unknown_error self, e, "Client" + @events.unknown_error e, nil, "Client" end end end - # Given a Hash +env+ for the request read from +client+, add - # and fixup keys to comply with Rack's env guidelines. - # - def normalize_env(env, client) - if host = env[HTTP_HOST] - if colon = host.index(":") - env[SERVER_NAME] = host[0, colon] - env[SERVER_PORT] = host[colon+1, host.bytesize] - else - env[SERVER_NAME] = host - env[SERVER_PORT] = default_server_port(env) - end - else - env[SERVER_NAME] = LOCALHOST - env[SERVER_PORT] = default_server_port(env) - end - - unless env[REQUEST_PATH] - # it might be a dumbass full host request header - uri = URI.parse(env[REQUEST_URI]) - env[REQUEST_PATH] = uri.path - - raise "No REQUEST PATH" unless env[REQUEST_PATH] - - # A nil env value will cause a LintError (and fatal errors elsewhere), - # so only set the env value if there actually is a value. - env[QUERY_STRING] = uri.query if uri.query - end - - env[PATH_INFO] = env[REQUEST_PATH] - - # From http://www.ietf.org/rfc/rfc3875 : - # "Script authors should be aware that the REMOTE_ADDR and - # REMOTE_HOST meta-variables (see sections 4.1.8 and 4.1.9) - # may not identify the ultimate source of the request. - # They identify the client for the immediate request to the - # server; that client may be a proxy, gateway, or other - # intermediary acting on behalf of the actual source client." - # - - unless env.key?(REMOTE_ADDR) - begin - addr = client.peerip - rescue Errno::ENOTCONN - # Client disconnects can result in an inability to get the - # peeraddr from the socket; default to localhost. - addr = LOCALHOST_IP - end - - # Set unix socket addrs to localhost - addr = LOCALHOST_IP if addr.empty? - - env[REMOTE_ADDR] = addr - end + # Triggers a client timeout if the thread-pool shuts down + # during execution of the provided block. + def with_force_shutdown(client, &block) + @thread_pool.with_force_shutdown(&block) + rescue ThreadPool::ForceShutdown + client.timeout! end - def default_server_port(env) - return PORT_443 if env[HTTPS_KEY] == 'on' || env[HTTPS_KEY] == 'https' - env['HTTP_X_FORWARDED_PROTO'] == 'https' ? PORT_443 : PORT_80 - end + # :nocov: - # Given the request +env+ from +client+ and a partial request body - # in +body+, finish reading the body if there is one and invoke - # the rack app. Then construct the response and write it back to - # +client+ - # - # +cl+ is the previously fetched Content-Length header if there - # was one. This is an optimization to keep from having to look - # it up again. - # - def handle_request(req, lines) - env = req.env - client = req.io + # Handle various error types thrown by Client I/O operations. + def client_error(e, client) + # Swallow, do not log + return if [ConnectionError, EOFError].include?(e.class) - return false if closed_socket?(client) - - normalize_env env, req - - env[PUMA_SOCKET] = client - - if env[HTTPS_KEY] && client.peercert - env[PUMA_PEERCERT] = client.peercert - end - - env[HIJACK_P] = true - env[HIJACK] = req - - body = req.body - - head = env[REQUEST_METHOD] == HEAD - - env[RACK_INPUT] = body - env[RACK_URL_SCHEME] = env[HTTPS_KEY] ? HTTPS : HTTP - - if @early_hints - env[EARLY_HINTS] = lambda { |headers| - fast_write client, "HTTP/1.1 103 Early Hints\r\n".freeze - - headers.each_pair do |k, vs| - if vs.respond_to?(:to_s) && !vs.to_s.empty? - vs.to_s.split(NEWLINE).each do |v| - next if possible_header_injection?(v) - fast_write client, "#{k}: #{v}\r\n" - end - else - fast_write client, "#{k}: #{vs}\r\n" - end - end - - fast_write client, "\r\n".freeze - } - end - - # Fixup any headers with , in the name to have _ now. We emit - # headers with , in them during the parse phase to avoid ambiguity - # with the - to _ conversion for critical headers. But here for - # compatibility, we'll convert them back. This code is written to - # avoid allocation in the common case (ie there are no headers - # with , in their names), that's why it has the extra conditionals. - - to_delete = nil - to_add = nil - - env.each do |k,v| - if k.start_with?("HTTP_") and k.include?(",") and k != "HTTP_TRANSFER,ENCODING" - if to_delete - to_delete << k - else - to_delete = [k] - end - - unless to_add - to_add = {} - end - - to_add[k.gsub(",", "_")] = v - end - end - - if to_delete - to_delete.each { |k| env.delete(k) } - env.merge! to_add - end - - # A rack extension. If the app writes #call'ables to this - # array, we will invoke them when the request is done. - # - after_reply = env[RACK_AFTER_REPLY] = [] - - begin - begin - status, headers, res_body = @app.call(env) - - return :async if req.hijacked - - status = status.to_i - - if status == -1 - unless headers.empty? and res_body == [] - raise "async response must have empty headers and body" - end - - return :async - end - rescue ThreadPool::ForceShutdown => e - @events.log "Detected force shutdown of a thread, returning 503" - @events.unknown_error self, e, "Rack app" - - status = 503 - headers = {} - res_body = ["Request was internally terminated early\n"] - - rescue Exception => e - @events.unknown_error self, e, "Rack app", env - - status, headers, res_body = lowlevel_error(e, env) - end - - content_length = nil - no_body = head - - if res_body.kind_of? Array and res_body.size == 1 - content_length = res_body[0].bytesize - end - - cork_socket client - - line_ending = LINE_END - colon = COLON - - http_11 = if env[HTTP_VERSION] == HTTP_11 - allow_chunked = true - keep_alive = env.fetch(HTTP_CONNECTION, "").downcase != CLOSE - include_keepalive_header = false - - # An optimization. The most common response is 200, so we can - # reply with the proper 200 status without having to compute - # the response header. - # - if status == 200 - lines << HTTP_11_200 - else - lines.append "HTTP/1.1 ", status.to_s, " ", - fetch_status_code(status), line_ending - - no_body ||= status < 200 || STATUS_WITH_NO_ENTITY_BODY[status] - end - true - else - allow_chunked = false - keep_alive = env.fetch(HTTP_CONNECTION, "").downcase == KEEP_ALIVE - include_keepalive_header = keep_alive - - # Same optimization as above for HTTP/1.1 - # - if status == 200 - lines << HTTP_10_200 - else - lines.append "HTTP/1.0 ", status.to_s, " ", - fetch_status_code(status), line_ending - - no_body ||= status < 200 || STATUS_WITH_NO_ENTITY_BODY[status] - end - false - end - - response_hijack = nil - - headers.each do |k, vs| - case k.downcase - when CONTENT_LENGTH2 - next if possible_header_injection?(vs) - content_length = vs - next - when TRANSFER_ENCODING - allow_chunked = false - content_length = nil - when HIJACK - response_hijack = vs - next - end - - if vs.respond_to?(:to_s) && !vs.to_s.empty? - vs.to_s.split(NEWLINE).each do |v| - next if possible_header_injection?(v) - lines.append k, colon, v, line_ending - end - else - lines.append k, colon, line_ending - end - end - - if include_keepalive_header - lines << CONNECTION_KEEP_ALIVE - elsif http_11 && !keep_alive - lines << CONNECTION_CLOSE - end - - if no_body - if content_length and status != 204 - lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending - end - - lines << line_ending - fast_write client, lines.to_s - return keep_alive - end - - if content_length - lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending - chunked = false - elsif !response_hijack and allow_chunked - lines << TRANSFER_ENCODING_CHUNKED - chunked = true - end - - lines << line_ending - - fast_write client, lines.to_s - - if response_hijack - response_hijack.call client - return :async - end - - begin - res_body.each do |part| - next if part.bytesize.zero? - if chunked - fast_write client, part.bytesize.to_s(16) - fast_write client, line_ending - fast_write client, part - fast_write client, line_ending - else - fast_write client, part - end - - client.flush - end - - if chunked - fast_write client, CLOSE_CHUNKED - client.flush - end - rescue SystemCallError, IOError - raise ConnectionError, "Connection error detected during write" - end - - ensure - uncork_socket client - - body.close - req.tempfile.unlink if req.tempfile - res_body.close if res_body.respond_to? :close - - after_reply.each { |o| o.call } - end - - return keep_alive - end - - def fetch_status_code(status) - HTTP_STATUS_CODES.fetch(status) { 'CUSTOM' } - end - private :fetch_status_code - - # Given the request +env+ from +client+ and the partial body +body+ - # plus a potential Content-Length value +cl+, finish reading - # the body and return it. - # - # If the body is larger than MAX_BODY, a Tempfile object is used - # for the body, otherwise a StringIO is used. - # - def read_body(env, client, body, cl) - content_length = cl.to_i - - remain = content_length - body.bytesize - - return StringIO.new(body) if remain <= 0 - - # Use a Tempfile if there is a lot of data left - if remain > MAX_BODY - stream = Tempfile.new(Const::PUMA_TMP_BASE) - stream.binmode + lowlevel_error(e, client.env) + case e + when MiniSSL::SSLError + @events.ssl_error e, client.io + when HttpParserError + client.write_error(400) + @events.parse_error e, client else - # The body[0,0] trick is to get an empty string in the same - # encoding as body. - stream = StringIO.new body[0,0] + client.write_error(500) + @events.unknown_error e, nil, "Read" end - - stream.write body - - # Read an odd sized chunk so we can read even sized ones - # after this - chunk = client.readpartial(remain % CHUNK_SIZE) - - # No chunk means a closed socket - unless chunk - stream.close - return nil - end - - remain -= stream.write(chunk) - - # Raed the rest of the chunks - while remain > 0 - chunk = client.readpartial(CHUNK_SIZE) - unless chunk - stream.close - return nil - end - - remain -= stream.write(chunk) - end - - stream.rewind - - return stream end # A fallback rack response if +@app+ raises as exception. # - def lowlevel_error(e, env) + def lowlevel_error(e, env, status=500) if handler = @options[:lowlevel_error_handler] if handler.arity == 1 return handler.call(e) - else + elsif handler.arity == 2 return handler.call(e, env) + else + return handler.call(e, env, status) end end if @leak_stack_on_error - [500, {}, ["Puma caught this error: #{e.message} (#{e.class})\n#{e.backtrace.join("\n")}"]] + backtrace = e.backtrace.nil? ? '<no backtrace available>' : e.backtrace.join("\n") + [status, {}, ["Puma caught this error: #{e.message} (#{e.class})\n#{backtrace}"]] else - [500, {}, ["An unhandled lowlevel error occurred. The application logs may have details.\n"]] + [status, {}, ["An unhandled lowlevel error occurred. The application logs may have details.\n"]] end end # Wait for all outstanding requests to finish. # @@ -966,54 +557,34 @@ $stdout.syswrite "#{pid}: #{t.backtrace.join("\n#{pid}: ")}\n\n" end $stdout.syswrite "#{pid}: === End thread backtrace dump ===\n" end - if @options[:drain_on_shutdown] - count = 0 - - while true - ios = IO.select @binder.ios, nil, nil, 0 - break unless ios - - ios.first.each do |sock| - begin - if io = sock.accept_nonblock - count += 1 - client = Client.new io, @binder.env(sock) - @thread_pool << client - end - rescue SystemCallError - end - end - end - - @events.debug "Drained #{count} additional connections." + if @status != :restart + @binder.close end if @thread_pool if timeout = @options[:force_shutdown_after] - @thread_pool.shutdown timeout.to_i + @thread_pool.shutdown timeout.to_f else @thread_pool.shutdown end end end def notify_safely(message) - begin - @notify << message - rescue IOError - # The server, in another thread, is shutting down + @notify << message + rescue IOError, NoMethodError, Errno::EPIPE + # The server, in another thread, is shutting down + Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue + rescue RuntimeError => e + # Temporary workaround for https://bugs.ruby-lang.org/issues/13239 + if e.message.include?('IOError') Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - rescue RuntimeError => e - # Temporary workaround for https://bugs.ruby-lang.org/issues/13239 - if e.message.include?('IOError') - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - else - raise e - end + else + raise e end end private :notify_safely # Stops the acceptor thread and then causes the worker threads to finish @@ -1027,46 +598,26 @@ def halt(sync=false) notify_safely(HALT_COMMAND) @thread.join if @thread && sync end - def begin_restart + def begin_restart(sync=false) notify_safely(RESTART_COMMAND) + @thread.join if @thread && sync end - def fast_write(io, str) - n = 0 - while true - begin - n = io.syswrite str - rescue Errno::EAGAIN, Errno::EWOULDBLOCK - if !IO.select(nil, [io], nil, WRITE_TIMEOUT) - raise ConnectionError, "Socket timeout writing data" - end - - retry - rescue Errno::EPIPE, SystemCallError, IOError - raise ConnectionError, "Socket timeout writing data" - end - - return if n == str.bytesize - str = str.byteslice(n..-1) - end - end - private :fast_write - - ThreadLocalKey = :puma_server - - def self.current - Thread.current[ThreadLocalKey] - end - def shutting_down? @status == :stop || @status == :restart end - def possible_header_injection?(header_value) - HTTP_INJECTION_REGEX =~ header_value.to_s + # List of methods invoked by #stats. + # @version 5.0.0 + STAT_METHODS = [:backlog, :running, :pool_capacity, :max_threads, :requests_count].freeze + + # Returns a hash of stats about the running server for reporting purposes. + # @version 5.0.0 + # @!attribute [r] stats + def stats + STAT_METHODS.map {|name| [name, send(name) || 0]}.to_h end - private :possible_header_injection? end end