# -*- encoding: binary -*- # Copyright (C) 2013, Eric Wong and all contributors # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) require_relative 'stream_file' require_relative 'wbuf_str' # Writes a Rack response to your client using the HTTP/1.1 specification. # You use it by simply doing: # # status, headers, body = rack_app.call(env) # http_response_write(status, headers, body) # # Most header correctness (including Content-Length and Content-Type) # is the job of Rack, with the exception of the "Date" header. module Yahns::HttpResponse # :nodoc: include Unicorn::HttpResponse if defined?(RUBY_ENGINE) && RUBY_ENGINE == "rbx" MTX = Mutex.new def httpdate MTX.synchronize { super } end end # avoid GC overhead for frequently used-strings: CONN_KA = "Connection: keep-alive\r\n\r\n" CONN_CLOSE = "Connection: close\r\n\r\n" Z = "" CCC_RESPONSE_START = [ 'HTTP', '/1.1 ' ] RESPONSE_START = CCC_RESPONSE_START.join REQUEST_METHOD = "REQUEST_METHOD" HEAD = "HEAD" # no point in using one without the other, these have been in Linux # for ages if Socket.const_defined?(:MSG_MORE) && Socket.const_defined?(:MSG_DONTWAIT) MSG_MORE = Socket::MSG_MORE MSG_DONTWAIT = Socket::MSG_DONTWAIT else MSG_MORE = 0 MSG_DONTWAIT = 0 def kgio_syssend(buf, flags) kgio_trywrite(buf) end end def response_start @response_start_sent ? Z : RESPONSE_START end def response_wait_write(rv) # call the kgio_wait_readable or kgio_wait_writable method ok = __send__("kgio_#{rv}") and return ok k = self.class k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after "\ "#{k.client_timeout}s") false end def err_response(code) "#{response_start}#{CODES[code]}\r\n\r\n" end def response_header_blocked(ret, header, body, alive, offset, count) if body.respond_to?(:to_path) alive = Yahns::StreamFile.new(body, alive, offset, count) body = nil end wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir) rv = wbuf.wbuf_write(self, header) body.each { |chunk| rv = wbuf.wbuf_write(self, chunk) } if body wbuf_maybe(wbuf, rv) end def wbuf_maybe(wbuf, rv) case rv # wbuf_write return value when nil # all done case rv = wbuf.wbuf_close(self) when :ignore # hijacked @state = rv when Yahns::StreamFile @state = rv :wait_writable when true, false http_response_done(rv) end else @state = wbuf rv end end def http_response_done(alive) @input = @input.close if @input if alive @response_start_sent = false # @hs.buf will have data if the client pipelined if @hs.buf.empty? @state = :headers :wait_readable else @state = :pipelined # we shouldn't start processing the application again until we know # the socket is writable for the response :wait_writable end else # shutdown is needed in case the app forked, we rescue here since # StreamInput may issue shutdown as well shutdown rescue nil :close end end def kv_str(key, value) if value =~ /\n/ # avoiding blank, key-only cookies with /\n+/ value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join else "#{key}: #{value}\r\n" end end def have_more?(value) value.to_i > 0 && @hs.env[REQUEST_METHOD] != HEAD end # writes the rack_response to socket as an HTTP response # returns :wait_readable, :wait_writable, :forget, or nil def http_response_write(status, headers, body) status = CODES[status.to_i] || status offset = 0 count = hijack = nil k = self.class alive = @hs.next? && k.persistent_connections flags = MSG_DONTWAIT if @hs.headers? buf = "#{response_start}#{status}\r\nDate: #{httpdate}\r\n" headers.each do |key, value| case key when %r{\ADate\z}i next when %r{\AContent-Range\z}i if %r{\Abytes (\d+)-(\d+)/\d+\z} =~ value offset = $1.to_i count = $2.to_i - offset + 1 end buf << kv_str(key, value) when %r{\AConnection\z}i # allow Rack apps to tell us they want to drop the client alive = false if value =~ /\bclose\b/i when %r{\AContent-Length\z}i flags |= MSG_MORE if have_more?(value) buf << kv_str(key, value) when "rack.hijack" hijack = value body = nil # ensure we do not close body else buf << kv_str(key, value) end end buf << (alive ? CONN_KA : CONN_CLOSE) case rv = kgio_syssend(buf, flags) when nil # all done, likely break when String flags = MSG_DONTWAIT buf = rv # hope the skb grows when :wait_writable, :wait_readable if k.output_buffering alive = hijack ? hijack : alive rv = response_header_blocked(rv, buf, body, alive, offset, count) body = nil # ensure we do not close body in ensure return rv else response_wait_write(rv) or return :close end end while true end return response_hijacked(hijack) if hijack if body.respond_to?(:to_path) @state = body = Yahns::StreamFile.new(body, alive, offset, count) return step_write end wbuf = rv = nil body.each do |chunk| if wbuf rv = wbuf.wbuf_write(self, chunk) else case rv = kgio_trywrite(chunk) when nil # all done, likely and good! break when String chunk = rv # hope the skb grows when we loop into the trywrite when :wait_writable, :wait_readable if k.output_buffering wbuf = Yahns::Wbuf.new(body, alive, k.output_buffer_tmpdir) rv = wbuf.wbuf_write(self, chunk) break else response_wait_write(rv) or return :close end end while true end end # if we buffered the write body, we must return :wait_writable # (or :wait_readable for SSL) and hit Yahns::HttpClient#step_write if wbuf body = nil # ensure we do not close the body in ensure wbuf_maybe(wbuf, rv) else http_response_done(alive) end ensure body.respond_to?(:close) and body.close end # returns nil on success # :wait_readable/:wait_writable/:close for epoll def do_ccc @response_start_sent = true wbuf = nil rv = nil CCC_RESPONSE_START.each do |buf| if wbuf wbuf << buf else case rv = kgio_trywrite(buf) when nil break when String buf = rv when :wait_writable, :wait_readable if self.class.output_buffering wbuf = buf.dup @state = Yahns::WbufStr.new(wbuf, :ccc_done) break else response_wait_write(rv) or return :close end end while true end end rv end # only used if input_buffering is true (not :lazy or false) # input_buffering==:lazy/false gives control to the app # returns nil on success # returns :close, :wait_writable, or :wait_readable def http_100_response(env) env.delete("HTTP_EXPECT") =~ /\A100-continue\z/i or return buf = @response_start_sent ? "100 Continue\r\n\r\nHTTP/1.1 ".freeze : "HTTP/1.1 100 Continue\r\n\r\n".freeze case rv = kgio_trywrite(buf) when String buf = rv when :wait_writable, :wait_readable if self.class.output_buffering @state = Yahns::WbufStr.new(buf, :r100_done) return rv else response_wait_write(rv) or return :close end else return rv end while true end end