lib/puma/server.rb in piesync-puma-3.12.6.1 vs lib/puma/server.rb in piesync-puma-5.4.0.1
- old
+ new
@@ -4,144 +4,179 @@
require 'puma/thread_pool'
require 'puma/const'
require 'puma/events'
require 'puma/null_io'
-require 'puma/compat'
require 'puma/reactor'
require 'puma/client'
require 'puma/binder'
-require 'puma/delegation'
-require 'puma/accept_nonblock'
require 'puma/util'
+require 'puma/io_buffer'
+require 'puma/request'
-require 'puma/puma_http11'
-
-unless Puma.const_defined? "IOBuffer"
- require 'puma/io_buffer'
-end
-
require 'socket'
+require 'io/wait'
+require 'forwardable'
module Puma
# The HTTP Server itself. Serves out a single Rack app.
#
# This class is used by the `Puma::Single` and `Puma::Cluster` classes
# to generate one or more `Puma::Server` instances capable of handling requests.
- # Each Puma process will contain one `Puma::Server` instacne.
+ # Each Puma process will contain one `Puma::Server` instance.
#
# The `Puma::Server` instance pulls requests from the socket, adds them to a
# `Puma::Reactor` where they get eventually passed to a `Puma::ThreadPool`.
#
# Each `Puma::Server` will have one reactor and one thread pool.
class Server
include Puma::Const
- extend Puma::Delegation
+ include Request
+ extend Forwardable
attr_reader :thread
attr_reader :events
+ attr_reader :min_threads, :max_threads # for #stats
+ attr_reader :requests_count # @version 5.0.0
+
+ # @todo the following may be deprecated in the future
+ attr_reader :auto_trim_time, :early_hints, :first_data_timeout,
+ :leak_stack_on_error,
+ :persistent_timeout, :reaping_time
+
+ # @deprecated v6.0.0
+ attr_writer :auto_trim_time, :early_hints, :first_data_timeout,
+ :leak_stack_on_error, :min_threads, :max_threads,
+ :persistent_timeout, :reaping_time
+
attr_accessor :app
+ attr_accessor :binder
- attr_accessor :min_threads
- attr_accessor :max_threads
- attr_accessor :persistent_timeout
- attr_accessor :auto_trim_time
- attr_accessor :reaping_time
- attr_accessor :first_data_timeout
+ def_delegators :@binder, :add_tcp_listener, :add_ssl_listener,
+ :add_unix_listener, :connected_ports
+ ThreadLocalKey = :puma_server
+
# Create a server for the rack app +app+.
#
# +events+ is an object which will be called when certain error events occur
# to be handled. See Puma::Events for the list of current methods to implement.
#
# Server#run returns a thread that you can join on to wait for the server
# to do its work.
#
+ # @note Several instance variables exist so they are available for testing,
+ # and have default values set via +fetch+. Normally the values are set via
+ # `::Puma::Configuration.puma_default_options`.
+ #
def initialize(app, events=Events.stdio, options={})
@app = app
@events = events
- @check, @notify = Puma::Util.pipe
-
+ @check, @notify = nil
@status = :stop
- @min_threads = 0
- @max_threads = 16
@auto_trim_time = 30
@reaping_time = 1
@thread = nil
@thread_pool = nil
- @early_hints = nil
- @persistent_timeout = options.fetch(:persistent_timeout, PERSISTENT_TIMEOUT)
- @first_data_timeout = options.fetch(:first_data_timeout, FIRST_DATA_TIMEOUT)
+ @options = options
- @binder = Binder.new(events)
- @own_binder = true
+ @early_hints = options.fetch :early_hints, nil
+ @first_data_timeout = options.fetch :first_data_timeout, FIRST_DATA_TIMEOUT
+ @min_threads = options.fetch :min_threads, 0
+ @max_threads = options.fetch :max_threads , (Puma.mri? ? 5 : 16)
+ @persistent_timeout = options.fetch :persistent_timeout, PERSISTENT_TIMEOUT
+ @queue_requests = options.fetch :queue_requests, true
+ @max_fast_inline = options.fetch :max_fast_inline, MAX_FAST_INLINE
+ @io_selector_backend = options.fetch :io_selector_backend, :auto
- @leak_stack_on_error = true
+ temp = !!(@options[:environment] =~ /\A(development|test)\z/)
+ @leak_stack_on_error = @options[:environment] ? temp : true
- @options = options
- @queue_requests = options[:queue_requests].nil? ? true : options[:queue_requests]
+ @binder = Binder.new(events)
ENV['RACK_ENV'] ||= "development"
@mode = :http
@precheck_closing = true
+
+ @requests_count = 0
end
- attr_accessor :binder, :leak_stack_on_error, :early_hints
-
- forward :add_tcp_listener, :@binder
- forward :add_ssl_listener, :@binder
- forward :add_unix_listener, :@binder
- forward :connected_port, :@binder
-
def inherit_binder(bind)
@binder = bind
- @own_binder = false
end
- def tcp_mode!
- @mode = :tcp
+ class << self
+ # @!attribute [r] current
+ def current
+ Thread.current[ThreadLocalKey]
+ end
+
+ # :nodoc:
+ # @version 5.0.0
+ def tcp_cork_supported?
+ Socket.const_defined?(:TCP_CORK) && Socket.const_defined?(:IPPROTO_TCP)
+ end
+
+ # :nodoc:
+ # @version 5.0.0
+ def closed_socket_supported?
+ Socket.const_defined?(:TCP_INFO) && Socket.const_defined?(:IPPROTO_TCP)
+ end
+ private :tcp_cork_supported?
+ private :closed_socket_supported?
end
# On Linux, use TCP_CORK to better control how the TCP stack
# packetizes our stream. This improves both latency and throughput.
+ # socket parameter may be an MiniSSL::Socket, so use to_io
#
- if RUBY_PLATFORM =~ /linux/
- UNPACK_TCP_STATE_FROM_TCP_INFO = "C".freeze
-
+ if tcp_cork_supported?
# 6 == Socket::IPPROTO_TCP
# 3 == TCP_CORK
# 1/0 == turn on/off
def cork_socket(socket)
+ skt = socket.to_io
begin
- socket.setsockopt(6, 3, 1) if socket.kind_of? TCPSocket
+ skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 1) if skt.kind_of? TCPSocket
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end
end
def uncork_socket(socket)
+ skt = socket.to_io
begin
- socket.setsockopt(6, 3, 0) if socket.kind_of? TCPSocket
+ skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 0) if skt.kind_of? TCPSocket
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end
end
+ else
+ def cork_socket(socket)
+ end
+ def uncork_socket(socket)
+ end
+ end
+
+ if closed_socket_supported?
+ UNPACK_TCP_STATE_FROM_TCP_INFO = "C".freeze
+
def closed_socket?(socket)
- return false unless socket.kind_of? TCPSocket
- return false unless @precheck_closing
+ skt = socket.to_io
+ return false unless skt.kind_of?(TCPSocket) && @precheck_closing
begin
- tcp_info = socket.getsockopt(Socket::SOL_TCP, Socket::TCP_INFO)
+ tcp_info = skt.getsockopt(Socket::IPPROTO_TCP, Socket::TCP_INFO)
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
@precheck_closing = false
false
else
@@ -149,25 +184,21 @@
# TIME_WAIT: 6, CLOSE: 7, CLOSE_WAIT: 8, LAST_ACK: 9, CLOSING: 11
(state >= 6 && state <= 9) || state == 11
end
end
else
- def cork_socket(socket)
- end
-
- def uncork_socket(socket)
- end
-
def closed_socket?(socket)
false
end
end
+ # @!attribute [r] backlog
def backlog
@thread_pool and @thread_pool.backlog
end
+ # @!attribute [r] running
def running
@thread_pool and @thread_pool.spawned
end
@@ -176,202 +207,115 @@
#
# For example if the number is 5 then it means
# there are 5 threads sitting idle ready to take
# a request. If one request comes in, then the
# value would be 4 until it finishes processing.
+ # @!attribute [r] pool_capacity
def pool_capacity
@thread_pool and @thread_pool.pool_capacity
end
- # Lopez Mode == raw tcp apps
-
- def run_lopez_mode(background=true)
- @thread_pool = ThreadPool.new(@min_threads,
- @max_threads,
- Hash) do |client, tl|
-
- io = client.to_io
- addr = io.peeraddr.last
-
- if addr.empty?
- # Set unix socket addrs to localhost
- addr = "127.0.0.1:0"
- else
- addr = "#{addr}:#{io.peeraddr[1]}"
- end
-
- env = { 'thread' => tl, REMOTE_ADDR => addr }
-
- begin
- @app.call env, client.to_io
- rescue Object => e
- STDERR.puts "! Detected exception at toplevel: #{e.message} (#{e.class})"
- STDERR.puts e.backtrace
- end
-
- client.close unless env['detach']
- end
-
- @events.fire :state, :running
-
- if background
- @thread = Thread.new { handle_servers_lopez_mode }
- return @thread
- else
- handle_servers_lopez_mode
- end
- end
-
- def handle_servers_lopez_mode
- begin
- check = @check
- sockets = [check] + @binder.ios
- pool = @thread_pool
-
- while @status == :run
- begin
- ios = IO.select sockets
- ios.first.each do |sock|
- if sock == check
- break if handle_check
- else
- begin
- if io = sock.accept_nonblock
- client = Client.new io, nil
- pool << client
- end
- rescue SystemCallError
- # nothing
- rescue Errno::ECONNABORTED
- # client closed the socket even before accept
- begin
- io.close
- rescue
- Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
- end
- end
- end
- end
- rescue Object => e
- @events.unknown_error self, e, "Listen loop"
- end
- end
-
- @events.fire :state, @status
-
- graceful_shutdown if @status == :stop || @status == :restart
-
- rescue Exception => e
- STDERR.puts "Exception handling servers: #{e.message} (#{e.class})"
- STDERR.puts e.backtrace
- ensure
- begin
- @check.close
- rescue
- Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
- end
-
- @notify.close
-
- if @status != :restart and @own_binder
- @binder.close
- end
- end
-
- @events.fire :state, :done
- end
# Runs the server.
#
# If +background+ is true (the default) then a thread is spun
# up in the background to handle requests. Otherwise requests
# are handled synchronously.
#
- def run(background=true)
+ def run(background=true, thread_name: 'server')
BasicSocket.do_not_reverse_lookup = true
@events.fire :state, :booting
@status = :run
- if @mode == :tcp
- return run_lopez_mode(background)
- end
+ @thread_pool = ThreadPool.new(
+ thread_name,
+ @min_threads,
+ @max_threads,
+ ::Puma::IOBuffer,
+ &method(:process_client)
+ )
- queue_requests = @queue_requests
-
- @thread_pool = ThreadPool.new(@min_threads,
- @max_threads,
- IOBuffer) do |client, buffer|
-
- # Advertise this server into the thread
- Thread.current[ThreadLocalKey] = self
-
- process_now = false
-
- begin
- if queue_requests
- process_now = client.eagerly_finish
- else
- client.finish
- process_now = true
- end
- rescue MiniSSL::SSLError => e
- ssl_socket = client.io
- addr = ssl_socket.peeraddr.last
- cert = ssl_socket.peercert
-
- client.close
-
- @events.ssl_error self, addr, cert, e
- rescue HttpParserError => e
- client.write_400
- client.close
-
- @events.parse_error self, client.env, e
- rescue ConnectionError, EOFError
- client.close
- else
- if process_now
- process_client client, buffer
- else
- client.set_timeout @first_data_timeout
- @reactor.add client
- end
- end
- end
-
+ @thread_pool.out_of_band_hook = @options[:out_of_band]
@thread_pool.clean_thread_locals = @options[:clean_thread_locals]
- if queue_requests
- @reactor = Reactor.new self, @thread_pool
- @reactor.run_in_thread
+ if @queue_requests
+ @reactor = Reactor.new(@io_selector_backend, &method(:reactor_wakeup))
+ @reactor.run
end
if @reaping_time
@thread_pool.auto_reap!(@reaping_time)
end
if @auto_trim_time
@thread_pool.auto_trim!(@auto_trim_time)
end
+ @check, @notify = Puma::Util.pipe unless @notify
+
@events.fire :state, :running
if background
- @thread = Thread.new { handle_servers }
+ @thread = Thread.new do
+ Puma.set_thread_name thread_name
+ handle_servers
+ end
return @thread
else
handle_servers
end
end
+ # This method is called from the Reactor thread when a queued Client receives data,
+ # times out, or when the Reactor is shutting down.
+ #
+ # It is responsible for ensuring that a request has been completely received
+ # before it starts to be processed by the ThreadPool. This may be known as read buffering.
+ # If read buffering is not done, and no other read buffering is performed (such as by an application server
+ # such as nginx) then the application would be subject to a slow client attack.
+ #
+ # For a graphical representation of how the request buffer works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline).
+ #
+ # The method checks to see if it has the full header and body with
+ # the `Puma::Client#try_to_finish` method. If the full request has been sent,
+ # then the request is passed to the ThreadPool (`@thread_pool << client`)
+ # so that a "worker thread" can pick up the request and begin to execute application logic.
+ # The Client is then removed from the reactor (return `true`).
+ #
+ # If a client object times out, a 408 response is written, its connection is closed,
+ # and the object is removed from the reactor (return `true`).
+ #
+ # If the Reactor is shutting down, all Clients are either timed out or passed to the
+ # ThreadPool, depending on their current state (#can_close?).
+ #
+ # Otherwise, if the full request is not ready then the client will remain in the reactor
+ # (return `false`). When the client sends more data to the socket the `Puma::Client` object
+ # will wake up and again be checked to see if it's ready to be passed to the thread pool.
+ def reactor_wakeup(client)
+ shutdown = !@queue_requests
+ if client.try_to_finish || (shutdown && !client.can_close?)
+ @thread_pool << client
+ elsif shutdown || client.timeout == 0
+ client.timeout!
+ else
+ client.set_timeout(@first_data_timeout)
+ false
+ end
+ rescue StandardError => e
+ client_error(e, client)
+ client.close
+ true
+ end
+
def handle_servers
begin
check = @check
sockets = [check] + @binder.ios
pool = @thread_pool
queue_requests = @queue_requests
+ drain = @options[:drain_on_shutdown] ? 0 : nil
remote_addr_value = nil
remote_addr_header = nil
case @options[:remote_address]
@@ -379,63 +323,62 @@
remote_addr_value = @options[:remote_address_value]
when :header
remote_addr_header = @options[:remote_address_header]
end
- while @status == :run
+ while @status == :run || (drain && shutting_down?)
begin
- ios = IO.select sockets
+ ios = IO.select sockets, nil, nil, (shutting_down? ? 0 : nil)
+ break unless ios
ios.first.each do |sock|
if sock == check
break if handle_check
else
- begin
- if io = sock.accept_nonblock
- client = Client.new io, @binder.env(sock)
- if remote_addr_value
- client.peerip = remote_addr_value
- elsif remote_addr_header
- client.remote_addr_header = remote_addr_header
- end
+ pool.wait_until_not_full
+ pool.wait_for_less_busy_worker(@options[:wait_for_less_busy_worker])
- pool << client
- pool.wait_until_not_full
- end
- rescue SystemCallError
- # nothing
- rescue Errno::ECONNABORTED
- # client closed the socket even before accept
- begin
- io.close
- rescue
- Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
- end
+ io = begin
+ sock.accept_nonblock
+ rescue IO::WaitReadable
+ next
end
+ drain += 1 if shutting_down?
+ client = Client.new io, @binder.env(sock)
+ client.listener = sock
+ if remote_addr_value
+ client.peerip = remote_addr_value
+ elsif remote_addr_header
+ client.remote_addr_header = remote_addr_header
+ end
+ pool << client
end
end
rescue Object => e
- @events.unknown_error self, e, "Listen loop"
+ @events.unknown_error e, nil, "Listen loop"
end
end
+ @events.debug "Drained #{drain} additional connections." if drain
@events.fire :state, @status
- graceful_shutdown if @status == :stop || @status == :restart
if queue_requests
- @reactor.clear!
+ @queue_requests = false
@reactor.shutdown
end
+ graceful_shutdown if @status == :stop || @status == :restart
rescue Exception => e
- STDERR.puts "Exception handling servers: #{e.message} (#{e.class})"
- STDERR.puts e.backtrace
+ @events.unknown_error e, nil, "Exception handling servers"
ensure
- @check.close
- @notify.close
-
- if @status != :restart and @own_binder
- @binder.close
+ begin
+ @check.close unless @check.closed?
+ rescue Errno::EBADF, RuntimeError
+ # RuntimeError is Ruby 2.2 issue, can't modify frozen IOError
+ # Errno::EBADF is infrequently raised
end
+ @notify.close
+ @notify = nil
+ @check = nil
end
@events.fire :state, :done
end
@@ -453,502 +396,150 @@
when RESTART_COMMAND
@status = :restart
return true
end
- return false
+ false
end
- # Given a connection on +client+, handle the incoming requests.
+ # Given a connection on +client+, handle the incoming requests,
+ # or queue the connection in the Reactor if no request is available.
#
- # This method support HTTP Keep-Alive so it may, depending on if the client
+ # This method is called from a ThreadPool worker thread.
+ #
+ # This method supports HTTP Keep-Alive so it may, depending on if the client
# indicates that it supports keep alive, wait for another request before
# returning.
#
+ # Return true if one or more requests were processed.
def process_client(client, buffer)
+ # Advertise this server into the thread
+ Thread.current[ThreadLocalKey] = self
+
+ clean_thread_locals = @options[:clean_thread_locals]
+ close_socket = true
+
+ requests = 0
+
begin
+ if @queue_requests &&
+ !client.eagerly_finish
- clean_thread_locals = @options[:clean_thread_locals]
- close_socket = true
+ client.set_timeout(@first_data_timeout)
+ if @reactor.add client
+ close_socket = false
+ return false
+ end
+ end
- requests = 0
+ with_force_shutdown(client) do
+ client.finish(@first_data_timeout)
+ end
while true
- case handle_request(client, buffer)
+ @requests_count += 1
+ case handle_request(client, buffer, requests + 1)
when false
- return
+ break
when :async
close_socket = false
- return
+ break
when true
- return unless @queue_requests
buffer.reset
ThreadPool.clean_thread_locals if clean_thread_locals
requests += 1
- check_for_more_data = @status == :run
+ # As an optimization, try to read the next request from the
+ # socket for a short time before returning to the reactor.
+ fast_check = @status == :run
- if requests >= MAX_FAST_INLINE
- # This will mean that reset will only try to use the data it already
- # has buffered and won't try to read more data. What this means is that
- # every client, independent of their request speed, gets treated like a slow
- # one once every MAX_FAST_INLINE requests.
- check_for_more_data = false
+ # Always pass the client back to the reactor after a reasonable
+ # number of inline requests if there are other requests pending.
+ fast_check = false if requests >= @max_fast_inline &&
+ @thread_pool.backlog > 0
+
+ next_request_ready = with_force_shutdown(client) do
+ client.reset(fast_check)
end
- unless client.reset(check_for_more_data)
- close_socket = false
+ unless next_request_ready
+ break unless @queue_requests
client.set_timeout @persistent_timeout
- @reactor.add client
- return
+ if @reactor.add client
+ close_socket = false
+ break
+ end
end
end
end
-
- # The client disconnected while we were reading data
- rescue ConnectionError
- # Swallow them. The ensure tries to close +client+ down
-
- # SSL handshake error
- rescue MiniSSL::SSLError => e
- lowlevel_error(e, client.env)
-
- ssl_socket = client.io
- addr = ssl_socket.peeraddr.last
- cert = ssl_socket.peercert
-
- close_socket = true
-
- @events.ssl_error self, addr, cert, e
-
- # The client doesn't know HTTP well
- rescue HttpParserError => e
- lowlevel_error(e, client.env)
-
- client.write_400
-
- @events.parse_error self, client.env, e
-
- # Server error
+ true
rescue StandardError => e
- lowlevel_error(e, client.env)
-
- client.write_500
-
- @events.unknown_error self, e, "Read"
-
+ client_error(e, client)
+ # The ensure tries to close +client+ down
+ requests > 0
ensure
buffer.reset
begin
client.close if close_socket
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
# Already closed
rescue StandardError => e
- @events.unknown_error self, e, "Client"
+ @events.unknown_error e, nil, "Client"
end
end
end
- # Given a Hash +env+ for the request read from +client+, add
- # and fixup keys to comply with Rack's env guidelines.
- #
- def normalize_env(env, client)
- if host = env[HTTP_HOST]
- if colon = host.index(":")
- env[SERVER_NAME] = host[0, colon]
- env[SERVER_PORT] = host[colon+1, host.bytesize]
- else
- env[SERVER_NAME] = host
- env[SERVER_PORT] = default_server_port(env)
- end
- else
- env[SERVER_NAME] = LOCALHOST
- env[SERVER_PORT] = default_server_port(env)
- end
-
- unless env[REQUEST_PATH]
- # it might be a dumbass full host request header
- uri = URI.parse(env[REQUEST_URI])
- env[REQUEST_PATH] = uri.path
-
- raise "No REQUEST PATH" unless env[REQUEST_PATH]
-
- # A nil env value will cause a LintError (and fatal errors elsewhere),
- # so only set the env value if there actually is a value.
- env[QUERY_STRING] = uri.query if uri.query
- end
-
- env[PATH_INFO] = env[REQUEST_PATH]
-
- # From http://www.ietf.org/rfc/rfc3875 :
- # "Script authors should be aware that the REMOTE_ADDR and
- # REMOTE_HOST meta-variables (see sections 4.1.8 and 4.1.9)
- # may not identify the ultimate source of the request.
- # They identify the client for the immediate request to the
- # server; that client may be a proxy, gateway, or other
- # intermediary acting on behalf of the actual source client."
- #
-
- unless env.key?(REMOTE_ADDR)
- begin
- addr = client.peerip
- rescue Errno::ENOTCONN
- # Client disconnects can result in an inability to get the
- # peeraddr from the socket; default to localhost.
- addr = LOCALHOST_IP
- end
-
- # Set unix socket addrs to localhost
- addr = LOCALHOST_IP if addr.empty?
-
- env[REMOTE_ADDR] = addr
- end
+ # Triggers a client timeout if the thread-pool shuts down
+ # during execution of the provided block.
+ def with_force_shutdown(client, &block)
+ @thread_pool.with_force_shutdown(&block)
+ rescue ThreadPool::ForceShutdown
+ client.timeout!
end
- def default_server_port(env)
- return PORT_443 if env[HTTPS_KEY] == 'on' || env[HTTPS_KEY] == 'https'
- env['HTTP_X_FORWARDED_PROTO'] == 'https' ? PORT_443 : PORT_80
- end
+ # :nocov:
- # Given the request +env+ from +client+ and a partial request body
- # in +body+, finish reading the body if there is one and invoke
- # the rack app. Then construct the response and write it back to
- # +client+
- #
- # +cl+ is the previously fetched Content-Length header if there
- # was one. This is an optimization to keep from having to look
- # it up again.
- #
- def handle_request(req, lines)
- env = req.env
- client = req.io
+ # Handle various error types thrown by Client I/O operations.
+ def client_error(e, client)
+ # Swallow, do not log
+ return if [ConnectionError, EOFError].include?(e.class)
- return false if closed_socket?(client)
-
- normalize_env env, req
-
- env[PUMA_SOCKET] = client
-
- if env[HTTPS_KEY] && client.peercert
- env[PUMA_PEERCERT] = client.peercert
- end
-
- env[HIJACK_P] = true
- env[HIJACK] = req
-
- body = req.body
-
- head = env[REQUEST_METHOD] == HEAD
-
- env[RACK_INPUT] = body
- env[RACK_URL_SCHEME] = env[HTTPS_KEY] ? HTTPS : HTTP
-
- if @early_hints
- env[EARLY_HINTS] = lambda { |headers|
- fast_write client, "HTTP/1.1 103 Early Hints\r\n".freeze
-
- headers.each_pair do |k, vs|
- if vs.respond_to?(:to_s) && !vs.to_s.empty?
- vs.to_s.split(NEWLINE).each do |v|
- next if possible_header_injection?(v)
- fast_write client, "#{k}: #{v}\r\n"
- end
- else
- fast_write client, "#{k}: #{vs}\r\n"
- end
- end
-
- fast_write client, "\r\n".freeze
- }
- end
-
- # Fixup any headers with , in the name to have _ now. We emit
- # headers with , in them during the parse phase to avoid ambiguity
- # with the - to _ conversion for critical headers. But here for
- # compatibility, we'll convert them back. This code is written to
- # avoid allocation in the common case (ie there are no headers
- # with , in their names), that's why it has the extra conditionals.
-
- to_delete = nil
- to_add = nil
-
- env.each do |k,v|
- if k.start_with?("HTTP_") and k.include?(",") and k != "HTTP_TRANSFER,ENCODING"
- if to_delete
- to_delete << k
- else
- to_delete = [k]
- end
-
- unless to_add
- to_add = {}
- end
-
- to_add[k.gsub(",", "_")] = v
- end
- end
-
- if to_delete
- to_delete.each { |k| env.delete(k) }
- env.merge! to_add
- end
-
- # A rack extension. If the app writes #call'ables to this
- # array, we will invoke them when the request is done.
- #
- after_reply = env[RACK_AFTER_REPLY] = []
-
- begin
- begin
- status, headers, res_body = @app.call(env)
-
- return :async if req.hijacked
-
- status = status.to_i
-
- if status == -1
- unless headers.empty? and res_body == []
- raise "async response must have empty headers and body"
- end
-
- return :async
- end
- rescue ThreadPool::ForceShutdown => e
- @events.log "Detected force shutdown of a thread, returning 503"
- @events.unknown_error self, e, "Rack app"
-
- status = 503
- headers = {}
- res_body = ["Request was internally terminated early\n"]
-
- rescue Exception => e
- @events.unknown_error self, e, "Rack app", env
-
- status, headers, res_body = lowlevel_error(e, env)
- end
-
- content_length = nil
- no_body = head
-
- if res_body.kind_of? Array and res_body.size == 1
- content_length = res_body[0].bytesize
- end
-
- cork_socket client
-
- line_ending = LINE_END
- colon = COLON
-
- http_11 = if env[HTTP_VERSION] == HTTP_11
- allow_chunked = true
- keep_alive = env.fetch(HTTP_CONNECTION, "").downcase != CLOSE
- include_keepalive_header = false
-
- # An optimization. The most common response is 200, so we can
- # reply with the proper 200 status without having to compute
- # the response header.
- #
- if status == 200
- lines << HTTP_11_200
- else
- lines.append "HTTP/1.1 ", status.to_s, " ",
- fetch_status_code(status), line_ending
-
- no_body ||= status < 200 || STATUS_WITH_NO_ENTITY_BODY[status]
- end
- true
- else
- allow_chunked = false
- keep_alive = env.fetch(HTTP_CONNECTION, "").downcase == KEEP_ALIVE
- include_keepalive_header = keep_alive
-
- # Same optimization as above for HTTP/1.1
- #
- if status == 200
- lines << HTTP_10_200
- else
- lines.append "HTTP/1.0 ", status.to_s, " ",
- fetch_status_code(status), line_ending
-
- no_body ||= status < 200 || STATUS_WITH_NO_ENTITY_BODY[status]
- end
- false
- end
-
- response_hijack = nil
-
- headers.each do |k, vs|
- case k.downcase
- when CONTENT_LENGTH2
- next if possible_header_injection?(vs)
- content_length = vs
- next
- when TRANSFER_ENCODING
- allow_chunked = false
- content_length = nil
- when HIJACK
- response_hijack = vs
- next
- end
-
- if vs.respond_to?(:to_s) && !vs.to_s.empty?
- vs.to_s.split(NEWLINE).each do |v|
- next if possible_header_injection?(v)
- lines.append k, colon, v, line_ending
- end
- else
- lines.append k, colon, line_ending
- end
- end
-
- if include_keepalive_header
- lines << CONNECTION_KEEP_ALIVE
- elsif http_11 && !keep_alive
- lines << CONNECTION_CLOSE
- end
-
- if no_body
- if content_length and status != 204
- lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending
- end
-
- lines << line_ending
- fast_write client, lines.to_s
- return keep_alive
- end
-
- if content_length
- lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending
- chunked = false
- elsif !response_hijack and allow_chunked
- lines << TRANSFER_ENCODING_CHUNKED
- chunked = true
- end
-
- lines << line_ending
-
- fast_write client, lines.to_s
-
- if response_hijack
- response_hijack.call client
- return :async
- end
-
- begin
- res_body.each do |part|
- next if part.bytesize.zero?
- if chunked
- fast_write client, part.bytesize.to_s(16)
- fast_write client, line_ending
- fast_write client, part
- fast_write client, line_ending
- else
- fast_write client, part
- end
-
- client.flush
- end
-
- if chunked
- fast_write client, CLOSE_CHUNKED
- client.flush
- end
- rescue SystemCallError, IOError
- raise ConnectionError, "Connection error detected during write"
- end
-
- ensure
- uncork_socket client
-
- body.close
- req.tempfile.unlink if req.tempfile
- res_body.close if res_body.respond_to? :close
-
- after_reply.each { |o| o.call }
- end
-
- return keep_alive
- end
-
- def fetch_status_code(status)
- HTTP_STATUS_CODES.fetch(status) { 'CUSTOM' }
- end
- private :fetch_status_code
-
- # Given the request +env+ from +client+ and the partial body +body+
- # plus a potential Content-Length value +cl+, finish reading
- # the body and return it.
- #
- # If the body is larger than MAX_BODY, a Tempfile object is used
- # for the body, otherwise a StringIO is used.
- #
- def read_body(env, client, body, cl)
- content_length = cl.to_i
-
- remain = content_length - body.bytesize
-
- return StringIO.new(body) if remain <= 0
-
- # Use a Tempfile if there is a lot of data left
- if remain > MAX_BODY
- stream = Tempfile.new(Const::PUMA_TMP_BASE)
- stream.binmode
+ lowlevel_error(e, client.env)
+ case e
+ when MiniSSL::SSLError
+ @events.ssl_error e, client.io
+ when HttpParserError
+ client.write_error(400)
+ @events.parse_error e, client
else
- # The body[0,0] trick is to get an empty string in the same
- # encoding as body.
- stream = StringIO.new body[0,0]
+ client.write_error(500)
+ @events.unknown_error e, nil, "Read"
end
-
- stream.write body
-
- # Read an odd sized chunk so we can read even sized ones
- # after this
- chunk = client.readpartial(remain % CHUNK_SIZE)
-
- # No chunk means a closed socket
- unless chunk
- stream.close
- return nil
- end
-
- remain -= stream.write(chunk)
-
- # Raed the rest of the chunks
- while remain > 0
- chunk = client.readpartial(CHUNK_SIZE)
- unless chunk
- stream.close
- return nil
- end
-
- remain -= stream.write(chunk)
- end
-
- stream.rewind
-
- return stream
end
# A fallback rack response if +@app+ raises as exception.
#
- def lowlevel_error(e, env)
+ def lowlevel_error(e, env, status=500)
if handler = @options[:lowlevel_error_handler]
if handler.arity == 1
return handler.call(e)
- else
+ elsif handler.arity == 2
return handler.call(e, env)
+ else
+ return handler.call(e, env, status)
end
end
if @leak_stack_on_error
- [500, {}, ["Puma caught this error: #{e.message} (#{e.class})\n#{e.backtrace.join("\n")}"]]
+ backtrace = e.backtrace.nil? ? '<no backtrace available>' : e.backtrace.join("\n")
+ [status, {}, ["Puma caught this error: #{e.message} (#{e.class})\n#{backtrace}"]]
else
- [500, {}, ["An unhandled lowlevel error occurred. The application logs may have details.\n"]]
+ [status, {}, ["An unhandled lowlevel error occurred. The application logs may have details.\n"]]
end
end
# Wait for all outstanding requests to finish.
#
@@ -966,54 +557,34 @@
$stdout.syswrite "#{pid}: #{t.backtrace.join("\n#{pid}: ")}\n\n"
end
$stdout.syswrite "#{pid}: === End thread backtrace dump ===\n"
end
- if @options[:drain_on_shutdown]
- count = 0
-
- while true
- ios = IO.select @binder.ios, nil, nil, 0
- break unless ios
-
- ios.first.each do |sock|
- begin
- if io = sock.accept_nonblock
- count += 1
- client = Client.new io, @binder.env(sock)
- @thread_pool << client
- end
- rescue SystemCallError
- end
- end
- end
-
- @events.debug "Drained #{count} additional connections."
+ if @status != :restart
+ @binder.close
end
if @thread_pool
if timeout = @options[:force_shutdown_after]
- @thread_pool.shutdown timeout.to_i
+ @thread_pool.shutdown timeout.to_f
else
@thread_pool.shutdown
end
end
end
def notify_safely(message)
- begin
- @notify << message
- rescue IOError
- # The server, in another thread, is shutting down
+ @notify << message
+ rescue IOError, NoMethodError, Errno::EPIPE
+ # The server, in another thread, is shutting down
+ Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
+ rescue RuntimeError => e
+ # Temporary workaround for https://bugs.ruby-lang.org/issues/13239
+ if e.message.include?('IOError')
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
- rescue RuntimeError => e
- # Temporary workaround for https://bugs.ruby-lang.org/issues/13239
- if e.message.include?('IOError')
- Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
- else
- raise e
- end
+ else
+ raise e
end
end
private :notify_safely
# Stops the acceptor thread and then causes the worker threads to finish
@@ -1027,46 +598,26 @@
def halt(sync=false)
notify_safely(HALT_COMMAND)
@thread.join if @thread && sync
end
- def begin_restart
+ def begin_restart(sync=false)
notify_safely(RESTART_COMMAND)
+ @thread.join if @thread && sync
end
- def fast_write(io, str)
- n = 0
- while true
- begin
- n = io.syswrite str
- rescue Errno::EAGAIN, Errno::EWOULDBLOCK
- if !IO.select(nil, [io], nil, WRITE_TIMEOUT)
- raise ConnectionError, "Socket timeout writing data"
- end
-
- retry
- rescue Errno::EPIPE, SystemCallError, IOError
- raise ConnectionError, "Socket timeout writing data"
- end
-
- return if n == str.bytesize
- str = str.byteslice(n..-1)
- end
- end
- private :fast_write
-
- ThreadLocalKey = :puma_server
-
- def self.current
- Thread.current[ThreadLocalKey]
- end
-
def shutting_down?
@status == :stop || @status == :restart
end
- def possible_header_injection?(header_value)
- HTTP_INJECTION_REGEX =~ header_value.to_s
+ # List of methods invoked by #stats.
+ # @version 5.0.0
+ STAT_METHODS = [:backlog, :running, :pool_capacity, :max_threads, :requests_count].freeze
+
+ # Returns a hash of stats about the running server for reporting purposes.
+ # @version 5.0.0
+ # @!attribute [r] stats
+ def stats
+ STAT_METHODS.map {|name| [name, send(name) || 0]}.to_h
end
- private :possible_header_injection?
end
end