# -*- encoding: binary -*- # Copyright (C) 2013, Eric Wong and all contributors # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) class Yahns::HttpClient < Kgio::Socket # :nodoc: NULL_IO = StringIO.new("") # FIXME: we shouldn't have this at all Unicorn::HttpParser.keepalive_requests = 0xffffffff include Yahns::HttpResponse QEV_FLAGS = Yahns::Queue::QEV_RD # used by acceptor # A frozen format for this is about 15% faster (note from Mongrel) REMOTE_ADDR = 'REMOTE_ADDR'.freeze RACK_INPUT = 'rack.input'.freeze RACK_HIJACK = 'rack.hijack'.freeze RACK_HIJACK_IO = "rack.hijack_io".freeze # called from acceptor thread def yahns_init @hs = Unicorn::HttpRequest.new @response_start_sent = false @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile @input = nil end # use if writes are deferred by buffering, this return value goes to # the main epoll/kqueue worker loop # returns :wait_readable, :wait_writable, or nil def step_write case rv = @state.wbuf_flush(self) when :wait_writable, :wait_readable return rv # tell epoll/kqueue to wait on this more when :ignore # :ignore on hijack @state = :ignore return :ignore when Yahns::StreamFile @state = rv # continue looping when true, false # done return http_response_done(rv) when :ccc_done, :r100_done @state = rv return :wait_writable else raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}" end while true end # used only with "input_buffering true" def mkinput_preread k = self.class len = @hs.content_length mbs = k.client_max_body_size if mbs && len && len > mbs raise Unicorn::RequestEntityTooLargeError, "Content-Length:#{len} too large (>#{mbs})", [] end @state = :body @input = k.tmpio_for(len) rbuf = Thread.current[:yahns_rbuf] @hs.filter_body(rbuf, @hs.buf) @input.write(rbuf) end def input_ready empty_body = 0 == @hs.content_length k = self.class case k.input_buffering when true rv = http_100_response(@hs.env) and return rv # common case is an empty body return NULL_IO if empty_body # content_length is nil (chunked) or len > 0 mkinput_preread # keep looping false else # :lazy, false empty_body ? NULL_IO : (@input = k.mkinput(self, @hs)) end end # returns true if we want to keep looping on this # returns :wait_readable/wait_writable/nil to yield back to epoll def fill_body(rsize, rbuf) case rv = kgio_tryread(rsize, rbuf) when String @hs.filter_body(rbuf, @hs.buf << rbuf) @input.write(rbuf) true # keep looping on kgio_tryread (but check body_eof? first) when :wait_readable, :wait_writable rv # have epoll/kqueue wait for more when nil # unexpected EOF @input.close # nil end end # returns true if we are ready to dispatch the app # returns :wait_readable/wait_writable/nil to yield back to epoll def read_trailers(rsize, rbuf) case rv = kgio_tryread(rsize, rbuf) when String if @hs.add_parse(rbuf) @input.rewind return true end # keep looping on kgio_tryread... when :wait_readable, :wait_writable return rv # wait for more when nil # unexpected EOF return @input.close # nil end while true end # the main entry point of the epoll/kqueue worker loop def yahns_step # always write unwritten data first if we have any return step_write if Yahns::WbufCommon === @state # only read if we had nothing to write in this event loop iteration k = self.class rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads case @state when :pipelined if @hs.parse case input = input_ready when :wait_readable, :wait_writable, :close then return input when false # keep looping on @state else return app_call(input) end # @state == :body if we get here point (input_ready -> mkinput_preread) else @state = :headers end # continue to outer loop when :headers case rv = kgio_tryread(k.client_header_buffer_size, rbuf) when String if @hs.add_parse(rv) case input = input_ready when :wait_readable, :wait_writable, :close then return input when false then break # to outer loop to reevaluate @state == :body else return app_call(input) end end # keep looping on kgio_tryread when :wait_readable, :wait_writable, nil return rv end while true when :body if @hs.body_eof? if @hs.content_length || @hs.parse # hp.parse == trailers done! @input.rewind return app_call(@input) else # possible Transfer-Encoding:chunked, keep looping @state = :trailers end else rv = fill_body(k.client_body_buffer_size, rbuf) return rv unless true == rv end when :trailers rv = read_trailers(k.client_header_buffer_size, rbuf) return true == rv ? app_call(@input) : rv when :ccc_done # unlikely return app_call(nil) when :r100_done # unlikely rv = r100_done return rv unless rv == true raise "BUG: body=#@state " if @state != :body # @state == :body, keep looping end while true # outer loop rescue => e handle_error(e) end # returns :wait_readable, :wait_writable, :ignore, or nil for epoll # returns false to keep looping inside yahns_step def r100_done k = self.class case k.input_buffering when true empty_body = 0 == @hs.content_length # common case is an empty body return app_call(NULL_IO) if empty_body # content_length is nil (chunked) or len > 0 mkinput_preread # keep looping (@state == :body) true else # :lazy, false http_response_write(*k.app.call(@hs.env)) end end def app_call(input) env = @hs.env k = self.class # input is nil if we needed to wait for writability with # check_client_connection if input env[REMOTE_ADDR] = @kgio_addr env[RACK_HIJACK] = hijack_proc(env) env[RACK_INPUT] = input if k.check_client_connection && @hs.headers? rv = do_ccc and return rv end end # run the rack app status, headers, body = k.app.call(env.merge!(k.app_defaults)) return :ignore if env.include?(RACK_HIJACK_IO) if status.to_i == 100 rv = http_100_response(env) and return rv status, headers, body = k.app.call(env) end # this returns :wait_readable, :wait_writable, :ignore, or nil: http_response_write(status, headers, body) end def hijack_proc(env) proc do self.class.queue.queue_del(self) # EPOLL_CTL_DEL env[RACK_HIJACK_IO] = self end end # called automatically by kgio_write def kgio_wait_writable(timeout = self.class.client_timeout) super timeout end # called automatically by kgio_read def kgio_wait_readable(timeout = self.class.client_timeout) super timeout end # used by StreamInput (and thus TeeInput) for input_buffering {false|:lazy} def yahns_read(bytes, buf) case rv = kgio_tryread(bytes, buf) when String, nil return rv when :wait_readable kgio_wait_readable or raise Yahns::ClientTimeout, "waiting for read", [] when :wait_writable kgio_wait_writable or raise Yahns::ClientTimeout, "waiting for write", [] end while true end def response_hijacked(fn) # we must issue EPOLL_CTL_DEL before hijacking (if we issue it at all), # because the hijacker may close use before we get back to the epoll worker # loop. EPOLL_CTL_DEL saves about 200 bytes of unswappable kernel memory, # so it can matter if we have lots of hijacked sockets. self.class.queue.queue_del(self) fn.call(self) :ignore end # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up # if the socket is already closed or broken. We'll always return # nil to ensure the socket is closed at the end of this function def handle_error(e) code = case e when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN return # don't send response, drop the connection when Yahns::ClientTimeout 408 when Unicorn::RequestURITooLongError 414 when Unicorn::RequestEntityTooLargeError 413 when Unicorn::HttpParserError # try to tell the client they're bad 400 else Yahns::Log.exception(@hs.env["rack.logger"], "app error", e) 500 end kgio_trywrite(err_response(code)) rescue ensure shutdown rescue nil return # always drop the connection on uncaught errors end end