lib/async/http/protocol/http2.rb in async-http-0.24.3 vs lib/async/http/protocol/http2.rb in async-http-0.25.0
- old
+ new
@@ -21,10 +21,12 @@
require_relative '../request'
require_relative '../response'
require_relative '../headers'
require_relative '../body/writable'
+require_relative 'http11'
+
require 'async/notification'
require 'http/2'
module Async
@@ -118,79 +120,116 @@
@reader.stop if @reader
@stream.close
end
+ class Request < HTTP::Request
+ def initialize(stream)
+ super(nil, nil, nil, VERSION, Headers.new, Body::Writable.new)
+
+ @stream = stream
+ end
+
+ attr :stream
+
+ def assign_headers(headers)
+ headers.each do |key, value|
+ if key == METHOD
+ raise BadRequest, "Request method already specified" if @method
+
+ @method = value
+ elsif key == PATH
+ raise BadRequest, "Request path already specified" if @path
+
+ @path = value
+ elsif key == AUTHORITY
+ raise BadRequest, "Request authority already specified" if @authority
+
+ @authority = value
+ else
+ @headers[key] = value
+ end
+ end
+ end
+
+ def hijack?
+ false
+ end
+ end
+
def receive_requests(task: Task.current, &block)
# emits new streams opened by the client
@controller.on(:stream) do |stream|
- request = Request.new
- request.version = self.version
- request.headers = Headers.new
- body = Body::Writable.new
- request.body = body
+ request = Request.new(stream)
+ body = request.body
stream.on(:headers) do |headers|
- headers.each do |key, value|
- if key == METHOD
- request.method = value
- elsif key == PATH
- request.path = value
- elsif key == AUTHORITY
- request.authority = value
- else
- request.headers[key] = value
+ begin
+ request.assign_headers(headers)
+ rescue
+ Async.logger.error(self) {$!}
+
+ stream.headers({
+ STATUS => "400"
+ }, end_stream: true)
+ else
+ task.async do
+ generate_response(request, stream, &block)
end
end
end
stream.on(:data) do |chunk|
- # puts "Got request data: #{chunk.inspect}"
body.write(chunk.to_s) unless chunk.empty?
end
- stream.on(:close) do |error|
- if error
- body.stop(EOFError.new(error))
- end
+ stream.on(:half_close) do
+ # We are no longer receiving any more data frames:
+ body.finish
end
- stream.on(:half_close) do
- # The requirements for this to be in lock-step with other opertaions is minimal.
- # TODO consider putting this in it's own async task.
- begin
- # We are no longer receiving any more data frames:
- body.finish
-
- # Generate the response:
- response = yield request
-
- headers = {STATUS => response.status.to_s}
- headers.update(response.headers)
-
- if response.body.nil? or response.body.empty?
- stream.headers(headers, end_stream: true)
- response.body.read if response.body
- else
- stream.headers(headers, end_stream: false)
-
- response.body.each do |chunk|
- stream.data(chunk, end_stream: false)
- end
-
- stream.data("", end_stream: true)
- end
- rescue
- Async.logger.error(self) {$!}
-
- # Generating the response failed.
- stream.close(:internal_error)
- end
+ stream.on(:close) do |error|
+ body.stop(EOFError.new(error)) if error
end
end
start_connection
@reader.wait
+ end
+
+ # Generate a response to the request. If this fails, the stream is terminated and the error is reported.
+ private def generate_response(request, stream, &block)
+ # We need to close the stream if the user code blows up while generating a response:
+ response = begin
+ yield(request, stream)
+ rescue
+ stream.close(:internal_error)
+
+ raise
+ end
+
+ if response
+ headers = Headers::Merged.new({
+ STATUS => response.status,
+ }, response.headers)
+
+ if response.body.nil? or response.body.empty?
+ stream.headers(headers, end_stream: true)
+ response.body.read if response.body
+ else
+ stream.headers(headers, end_stream: false)
+
+ response.body.each do |chunk|
+ stream.data(chunk, end_stream: false)
+ end
+
+ stream.data("", end_stream: true)
+ end
+ else
+ stream.close(:internal_error) unless stream.state == :closed
+ end
+ rescue
+ Async.logger.error(request) {$!}
end
def call(request)
request.version ||= self.version