lib/async/http/protocol/http11.rb in async-http-0.22.0 vs lib/async/http/protocol/http11.rb in async-http-0.23.0

- old
+ new

@@ -18,10 +18,12 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. require 'async/io/protocol/line' +require_relative 'request_failed' + require_relative '../request' require_relative '../response' require_relative '../headers' require_relative '../body/chunked' @@ -31,11 +33,16 @@ module HTTP module Protocol # Implements basic HTTP/1.1 request/response. class HTTP11 < Async::IO::Protocol::Line CRLF = "\r\n".freeze + CONNECTION = 'connection'.freeze + HOST = 'host'.freeze + CLOSE = 'close'.freeze + VERSION = "HTTP/1.1".freeze + def initialize(stream) super(stream, CRLF) @persistent = true @count = 0 @@ -47,115 +54,122 @@ def multiplex 1 end def reusable? - @persistent + @persistent && !@stream.closed? end class << self alias server new alias client new end - CLOSE = 'close'.freeze - - VERSION = "HTTP/1.1".freeze - def version VERSION end def persistent?(headers) - headers['connection'] != CLOSE + headers.delete(CONNECTION) != CLOSE end # Server loop. def receive_requests(task: Task.current) - while true + while @persistent request = Request.new(*read_request) - @count += 1 + unless persistent?(request.headers) + @persistent = false + end + response = yield request response.version ||= request.version write_response(response.version, response.status, response.headers, response.body) request.finish - unless persistent?(request.headers) and persistent?(response.headers) - @persistent = false - - break - end - # This ensures we yield at least once every iteration of the loop and allow other fibers to execute. task.yield end end def call(request) - @count += 1 - request.version ||= self.version Async.logger.debug(self) {"#{request.method} #{request.path} #{request.headers.inspect}"} - write_request(request.authority, request.method, request.path, request.version, request.headers, request.body) + # We carefully interpret https://tools.ietf.org/html/rfc7230#section-6.3.1 to implement this correctly. + begin + write_request(request.authority, request.method, request.path, request.version, request.headers) + rescue + # If we fail to fully write the request and body, we can retry this request. + raise RequestFailed.new + end + + # Once we start writing the body, we can't recover if the request fails. That's because the body might be generated dynamically, streaming, etc. + write_body(request.body) + return Response.new(*read_response) rescue EOFError - Async.logger.debug(self) {"Connection failed with EOFError after #{@count} requests."} - return nil + # This will ensure that #reusable? returns false. + @stream.close + + raise end - def write_request(authority, method, path, version, headers, body) + def write_request(authority, method, path, version, headers) @stream.write("#{method} #{path} #{version}\r\n") @stream.write("Host: #{authority}\r\n") - write_headers(headers) - write_body(body) @stream.flush - - return true end def read_response version, status, reason = read_line.split(/\s+/, 3) headers = read_headers body = read_body(headers) + @count += 1 + @persistent = persistent?(headers) return version, Integer(status), reason, headers, body end def read_request method, path, version = read_line.split(/\s+/, 3) headers = read_headers body = read_body(headers) - return headers.delete('host'), method, path, version, headers, body + @count += 1 + + return headers.delete(HOST), method, path, version, headers, body end def write_response(version, status, headers, body) @stream.write("#{version} #{status}\r\n") write_headers(headers) write_body(body) @stream.flush - - return true end protected + def write_persistent_header + @stream.write("Connection: close\r\n") unless @persistent + end + def write_headers(headers) headers.each do |name, value| @stream.write("#{name}: #{value}\r\n") end + + write_persistent_header end def read_headers fields = [] @@ -194,9 +208,11 @@ body.each do |chunk| @stream.write(chunk) end end + + @stream.flush end def read_body(headers) if headers['transfer-encoding'] == 'chunked' return Body::Chunked.new(self)