lib/async/http/protocol/http2.rb in async-http-0.22.0 vs lib/async/http/protocol/http2.rb in async-http-0.23.0
- old
+ new
@@ -64,10 +64,17 @@
@controller.on(:frame_received) do |frame|
Async.logger.debug(self) {"Received frame: #{frame.inspect}"}
end
+ @controller.on(:goaway) do |payload|
+ Async.logger.error(self) {"goaway: #{payload.inspect}"}
+
+ @reader.stop
+ @stream.io.close
+ end
+
@count = 0
end
attr :count
@@ -75,11 +82,11 @@
def multiplex
@controller.remote_settings[:settings_max_concurrent_streams]
end
def reusable?
- @reader.alive?
+ !@stream.closed?
end
def version
VERSION
end
@@ -100,19 +107,18 @@
end
end
def close
Async.logger.debug(self) {"Closing connection"}
- @reader.stop
+
+ @reader.stop if @reader
@stream.close
end
def receive_requests(task: Task.current, &block)
# emits new streams opened by the client
@controller.on(:stream) do |stream|
- @count += 1
-
request = Request.new
request.version = self.version
request.headers = Headers.new
body = Body::Writable.new
request.body = body
@@ -134,61 +140,68 @@
stream.on(:data) do |chunk|
# puts "Got request data: #{chunk.inspect}"
body.write(chunk.to_s) unless chunk.empty?
end
+ stream.on(:close) do |error|
+ if error
+ body.stop(EOFError.new(error))
+ end
+ end
+
stream.on(:half_close) do
- # puts "Generating response..."
- response = yield request
-
- # puts "Finishing body..."
- body.finish
-
- # puts "Sending response..."
- # send response
- headers = {STATUS => response.status.to_s}
- headers.update(response.headers)
-
- # puts "Sending headers #{headers}"
- if response.body.nil? or response.body.empty?
- stream.headers(headers, end_stream: true)
- response.body.read if response.body
- else
- stream.headers(headers, end_stream: false)
+ begin
+ # We are no longer receiving any more data frames:
+ body.finish
- # puts "Streaming body..."
- response.body.each do |chunk|
- # puts "Sending chunk #{chunk.inspect}"
- stream.data(chunk, end_stream: false)
+ # Generate the response:
+ response = yield request
+
+ headers = {STATUS => response.status.to_s}
+ headers.update(response.headers)
+
+ if response.body.nil? or response.body.empty?
+ stream.headers(headers, end_stream: true)
+ response.body.read if response.body
+ else
+ stream.headers(headers, end_stream: false)
+
+ response.body.each do |chunk|
+ stream.data(chunk, end_stream: false)
+ end
+
+ stream.data("", end_stream: true)
end
+ rescue
+ Async.logger.error(self) {$!}
- # puts "Ending stream..."
- stream.data("", end_stream: true)
+ # Generating the response failed.
+ stream.close(:internal_error)
end
end
end
start_connection
@reader.wait
end
def call(request)
- @count += 1
-
request.version ||= self.version
stream = @controller.new_stream
+ @count += 1
headers = {
SCHEME => HTTPS,
METHOD => request.method.to_s,
PATH => request.path.to_s,
AUTHORITY => request.authority.to_s,
}.merge(request.headers)
finished = Async::Notification.new
+ exception = nil
response = Response.new
response.version = self.version
response.headers = {}
body = Body::Writable.new
response.body = body
@@ -202,26 +215,44 @@
else
response.headers[key] = value
end
end
+ # At this point, we are now expecting two events: data and close.
+ stream.on(:close) do |error|
+ # If we receive close after this point, it's not a request error, but a failure we need to signal to the body.
+ if error
+ body.stop(EOFError.new(error))
+ else
+ body.finish
+ end
+ end
+
finished.signal
end
stream.on(:data) do |chunk|
body.write(chunk.to_s) unless chunk.empty?
end
- stream.on(:close) do
- body.finish
+ stream.on(:close) do |error|
+ # The remote server has closed the connection while we were sending the request.
+ if error
+ exception = EOFError.new(error)
+ finished.signal
+ end
end
if request.body.nil? or request.body.empty?
stream.headers(headers, end_stream: true)
request.body.read if request.body
else
- stream.headers(headers, end_stream: false)
+ begin
+ stream.headers(headers, end_stream: false)
+ rescue
+ raise RequestFailed.new
+ end
request.body.each do |chunk|
stream.data(chunk, end_stream: false)
end
@@ -229,13 +260,17 @@
end
start_connection
@stream.flush
- # Async.logger.debug(self) {"Stream flushed, waiting for signal."}
+ Async.logger.debug(self) {"Stream flushed, waiting for signal."}
finished.wait
- # Async.logger.debug(self) {"Stream finished: #{response.inspect}"}
+ if exception
+ raise exception
+ end
+
+ Async.logger.debug(self) {"Stream finished: #{response.inspect}"}
return response
end
end
end
end