lib/async/http/protocol/http2.rb in async-http-0.24.3 vs lib/async/http/protocol/http2.rb in async-http-0.25.0

- old
+ new

@@ -21,10 +21,12 @@ require_relative '../request' require_relative '../response' require_relative '../headers' require_relative '../body/writable' +require_relative 'http11' + require 'async/notification' require 'http/2' module Async @@ -118,79 +120,116 @@ @reader.stop if @reader @stream.close end + class Request < HTTP::Request + def initialize(stream) + super(nil, nil, nil, VERSION, Headers.new, Body::Writable.new) + + @stream = stream + end + + attr :stream + + def assign_headers(headers) + headers.each do |key, value| + if key == METHOD + raise BadRequest, "Request method already specified" if @method + + @method = value + elsif key == PATH + raise BadRequest, "Request path already specified" if @path + + @path = value + elsif key == AUTHORITY + raise BadRequest, "Request authority already specified" if @authority + + @authority = value + else + @headers[key] = value + end + end + end + + def hijack? + false + end + end + def receive_requests(task: Task.current, &block) # emits new streams opened by the client @controller.on(:stream) do |stream| - request = Request.new - request.version = self.version - request.headers = Headers.new - body = Body::Writable.new - request.body = body + request = Request.new(stream) + body = request.body stream.on(:headers) do |headers| - headers.each do |key, value| - if key == METHOD - request.method = value - elsif key == PATH - request.path = value - elsif key == AUTHORITY - request.authority = value - else - request.headers[key] = value + begin + request.assign_headers(headers) + rescue + Async.logger.error(self) {$!} + + stream.headers({ + STATUS => "400" + }, end_stream: true) + else + task.async do + generate_response(request, stream, &block) end end end stream.on(:data) do |chunk| - # puts "Got request data: #{chunk.inspect}" body.write(chunk.to_s) unless chunk.empty? end - stream.on(:close) do |error| - if error - body.stop(EOFError.new(error)) - end + stream.on(:half_close) do + # We are no longer receiving any more data frames: + body.finish end - stream.on(:half_close) do - # The requirements for this to be in lock-step with other opertaions is minimal. - # TODO consider putting this in it's own async task. - begin - # We are no longer receiving any more data frames: - body.finish - - # Generate the response: - response = yield request - - headers = {STATUS => response.status.to_s} - headers.update(response.headers) - - if response.body.nil? or response.body.empty? - stream.headers(headers, end_stream: true) - response.body.read if response.body - else - stream.headers(headers, end_stream: false) - - response.body.each do |chunk| - stream.data(chunk, end_stream: false) - end - - stream.data("", end_stream: true) - end - rescue - Async.logger.error(self) {$!} - - # Generating the response failed. - stream.close(:internal_error) - end + stream.on(:close) do |error| + body.stop(EOFError.new(error)) if error end end start_connection @reader.wait + end + + # Generate a response to the request. If this fails, the stream is terminated and the error is reported. + private def generate_response(request, stream, &block) + # We need to close the stream if the user code blows up while generating a response: + response = begin + yield(request, stream) + rescue + stream.close(:internal_error) + + raise + end + + if response + headers = Headers::Merged.new({ + STATUS => response.status, + }, response.headers) + + if response.body.nil? or response.body.empty? + stream.headers(headers, end_stream: true) + response.body.read if response.body + else + stream.headers(headers, end_stream: false) + + response.body.each do |chunk| + stream.data(chunk, end_stream: false) + end + + stream.data("", end_stream: true) + end + else + stream.close(:internal_error) unless stream.state == :closed + end + rescue + Async.logger.error(request) {$!} end def call(request) request.version ||= self.version