# -*- encoding: binary -*- # Copyright (C) 2015-2016 all contributors # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) # frozen_string_literal: true # loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for # constants. module Yahns::HttpResponse # :nodoc: # write everything in buf to our client socket (or wbuf, if it exists) # it may return a newly-created wbuf or nil def proxy_write(wbuf, buf, alive) unless wbuf # no write buffer, try to write directly to the client socket case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf) when nil then return # done writing buf, likely when String, Array # partial write, hope the skb grows buf = rv when :wait_writable, :wait_readable wbuf = Yahns::Wbuf.new(nil, alive, self.class.output_buffer_tmpdir, rv) buf = buf.join if Array === buf break end while true end wbuf.wbuf_write(self, buf) wbuf.busy ? wbuf : nil end def proxy_err_response(code, req_res, exc, wbuf) logger = @hs.env['rack.logger'] case exc when nil logger.error('premature upstream EOF') when Kcar::ParserError logger.error("upstream response error: #{exc.message}") else Yahns::Log.exception(logger, 'upstream error', exc) end # try to write something, but don't care if we fail Integer === code and kgio_trywrite("HTTP/1.1 #{code} #{ Rack::Utils::HTTP_STATUS_CODES[code]}\r\n\r\n") rescue nil shutdown rescue nil req_res.shutdown rescue nil nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb ensure wbuf.wbuf_abort if wbuf end def wait_on_upstream(req_res, alive, wbuf) req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, alive, self.class.output_buffer_tmpdir, false) :wait_readable # self remains in :ignore, wait on upstream end # returns :wait_readable if we need to read more from req_res # returns :ignore if we yield control to the client(self) # returns nil if completely done def proxy_response_start(res, tip, kcar, req_res) status, headers = res code = status.to_i msg = Rack::Utils::HTTP_STATUS_CODES[code] env = @hs.env have_body = !Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code) && env['REQUEST_METHOD'] != 'HEAD'.freeze flags = MSG_DONTWAIT alive = @hs.next? && self.class.persistent_connections term = false response_headers = env['yahns.proxy_pass.response_headers'] res = "HTTP/1.1 #{msg ? %Q(#{code} #{msg}) : status}\r\n".dup headers.each do |key,value| # n.b.: headers is an Array of 2-element Arrays case key when /\A(?:Connection|Keep-Alive)\z/i next # do not let some upstream headers leak through when %r{\AContent-Length\z}i term = true flags |= MSG_MORE if have_body && value.to_i > 0 when %r{\ATransfer-Encoding\z}i term = true if value =~ /\bchunked\b/i end # response header mapping case val = response_headers[key] when :ignore next when String value = val end res << "#{key}: #{value}\r\n" end # For now, do not add a Date: header, assume upstream already did it # but do not care if they did not # chunk the response ourselves if the client supports it, # but the backend does not terminate properly if alive && ! term && (env['HTTP_VERSION'] == 'HTTP/1.1'.freeze) res << "Transfer-Encoding: chunked\r\n".freeze alive = true end res << (alive ? "Connection: keep-alive\r\n\r\n".freeze : "Connection: close\r\n\r\n".freeze) # send the headers case rv = kgio_syssend(res, flags) when nil then break # all done, likely when String # partial write, highly unlikely flags = MSG_DONTWAIT res = rv # hope the skb grows when :wait_writable, :wait_readable # highly unlikely in real apps wbuf = proxy_write(nil, res, alive) break # keep buffering as much as possible end while true rbuf = Thread.current[:yahns_rbuf] tip = tip.empty? ? [] : [ tip ] if have_body if len = kcar.body_bytes_left case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf) when String len = kcar.body_bytes_left -= tmp.size wbuf = proxy_write(wbuf, tmp, alive) when nil # premature EOF return proxy_err_response(nil, req_res, nil, wbuf) when :wait_readable return wait_on_upstream(req_res, alive, wbuf) end until len == 0 elsif kcar.chunked? # nasty chunked body req_res.proxy_trailers = nil # define to avoid warnings for now buf = ''.dup case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf) when String kcar.filter_body(buf, tmp) wbuf = proxy_write(wbuf, chunk_out(buf), alive) unless buf.empty? when nil # premature EOF return proxy_err_response(nil, req_res, nil, wbuf) when :wait_readable return wait_on_upstream(req_res, alive, wbuf) end until kcar.body_eof? buf = tmp req_res.proxy_trailers = [ buf, tlr = [] ] rbuf = Thread.current[:yahns_rbuf] = ''.dup until kcar.trailers(tlr, buf) case rv = req_res.kgio_tryread(0x2000, rbuf) when String buf << rv when :wait_readable return wait_on_upstream(req_res, alive, wbuf) when nil # premature EOF return proxy_err_response(nil, req_res, nil, wbuf) end # no loop here end wbuf = proxy_write(wbuf, trailer_out(tlr), alive) else # no Content-Length or Transfer-Encoding: chunked, wait on EOF! case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf) when String tmp = chunk_out(tmp) if alive wbuf = proxy_write(wbuf, tmp, alive) when nil req_res.shutdown break when :wait_readable return wait_on_upstream(req_res, alive, wbuf) end while true end end return proxy_busy_mod_done(alive) unless wbuf req_res.resbuf = wbuf proxy_busy_mod_blocked(wbuf, wbuf.busy) rescue => e proxy_err_response(502, req_res, e, wbuf) end def proxy_response_finish(kcar, wbuf, req_res) rbuf = Thread.current[:yahns_rbuf] alive = wbuf.wbuf_persist if len = kcar.body_bytes_left # known Content-Length case tmp = req_res.kgio_tryread(0x2000, rbuf) when String len = kcar.body_bytes_left -= tmp.size wbuf.wbuf_write(self, tmp) when nil # premature EOF return proxy_err_response(nil, req_res, nil, wbuf) when :wait_readable return :wait_readable # self remains in :ignore, wait on upstream end while len != 0 elsif kcar.chunked? # nasty chunked response body buf = ''.dup unless req_res.proxy_trailers # are we done dechunking the main body, yet? case tmp = req_res.kgio_tryread(0x2000, rbuf) when String kcar.filter_body(buf, tmp) buf.empty? or wbuf.wbuf_write(self, chunk_out(buf)) when nil # premature EOF return proxy_err_response(nil, req_res, nil, wbuf) when :wait_readable return :wait_readable # self remains in :ignore, wait on upstream end until kcar.body_eof? req_res.proxy_trailers = [ tmp, [] ] # onto trailers! rbuf = Thread.current[:yahns_rbuf] = ''.dup end buf, tlr = *req_res.proxy_trailers until kcar.trailers(tlr, buf) case rv = req_res.kgio_tryread(0x2000, rbuf) when String buf << rv when :wait_readable return :wait_readable when nil # premature EOF return proxy_err_response(nil, req_res, nil, wbuf) end # no loop here end wbuf.wbuf_write(self, trailer_out(tlr)) else # no Content-Length or Transfer-Encoding: chunked, wait on EOF! case tmp = req_res.kgio_tryread(0x2000, rbuf) when String tmp = chunk_out(tmp) if alive wbuf.wbuf_write(self, tmp) when nil wbuf.wbuf_write(self, "0\r\n\r\n".freeze) if alive req_res.shutdown break when :wait_readable return :wait_readable # self remains in :ignore, wait on upstream end while true end busy = wbuf.busy and return proxy_busy_mod_blocked(wbuf, busy) proxy_busy_mod_done(alive) # returns nil end def proxy_wait_next(qflags) Thread.current[:yahns_fdmap].remember(self) # We must allocate a new, empty request object here to avoid a TOCTTOU # in the following timeline # # original thread: | another thread # HttpClient#yahns_step | # r = k.app.call(env = @hs.env) # socket hijacked into epoll queue # | epoll_wait readiness # | ReqRes#yahns_step # | proxy dispatch ... # | proxy_busy_mod_done # ************************** DANGER BELOW ******************************** # | HttpClient#yahns_step # | # clears env # sees empty env: | # return :ignore if env.include?('rack.hijack_io') | # # In other words, we cannot touch the original env seen by the # original thread since it must see the 'rack.hijack_io' value # because both are operating in the same Yahns::HttpClient object. # This will happen regardless of GVL existence hs = Unicorn::HttpRequest.new hs.buf.replace(@hs.buf) @hs = hs # n.b. we may not touch anything in this object once we call queue_mod, # another thread is likely to take it! Thread.current[:yahns_queue].queue_mod(self, qflags) end def proxy_busy_mod_done(alive) case http_response_done(alive) when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD) when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR) when :close then close end nil # close the req_res, too end def proxy_busy_mod_blocked(wbuf, busy) q = Thread.current[:yahns_queue] # we are completely done reading and buffering the upstream response, # but have not completely written the response to the client, # yield control to the client socket: @state = wbuf case busy when :wait_readable then q.queue_mod(self, Yahns::Queue::QEV_RD) when :wait_writable then q.queue_mod(self, Yahns::Queue::QEV_WR) else abort "BUG: invalid wbuf.busy: #{busy.inspect}" end # no touching self after queue_mod :ignore end # n.b.: we can use String#size for optimized dispatch under YARV instead # of String#bytesize because all the IO read methods return a binary # string when given a maximum read length def chunk_out(buf) [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ] end def trailer_out(tlr) "0\r\n#{tlr.map! do |k,v| "#{k}: #{v}\r\n" end.join}\r\n" end end