lib/async/http/protocol/http2.rb in async-http-0.22.0 vs lib/async/http/protocol/http2.rb in async-http-0.23.0

- old
+ new

@@ -64,10 +64,17 @@ @controller.on(:frame_received) do |frame| Async.logger.debug(self) {"Received frame: #{frame.inspect}"} end + @controller.on(:goaway) do |payload| + Async.logger.error(self) {"goaway: #{payload.inspect}"} + + @reader.stop + @stream.io.close + end + @count = 0 end attr :count @@ -75,11 +82,11 @@ def multiplex @controller.remote_settings[:settings_max_concurrent_streams] end def reusable? - @reader.alive? + !@stream.closed? end def version VERSION end @@ -100,19 +107,18 @@ end end def close Async.logger.debug(self) {"Closing connection"} - @reader.stop + + @reader.stop if @reader @stream.close end def receive_requests(task: Task.current, &block) # emits new streams opened by the client @controller.on(:stream) do |stream| - @count += 1 - request = Request.new request.version = self.version request.headers = Headers.new body = Body::Writable.new request.body = body @@ -134,61 +140,68 @@ stream.on(:data) do |chunk| # puts "Got request data: #{chunk.inspect}" body.write(chunk.to_s) unless chunk.empty? end + stream.on(:close) do |error| + if error + body.stop(EOFError.new(error)) + end + end + stream.on(:half_close) do - # puts "Generating response..." - response = yield request - - # puts "Finishing body..." - body.finish - - # puts "Sending response..." - # send response - headers = {STATUS => response.status.to_s} - headers.update(response.headers) - - # puts "Sending headers #{headers}" - if response.body.nil? or response.body.empty? - stream.headers(headers, end_stream: true) - response.body.read if response.body - else - stream.headers(headers, end_stream: false) + begin + # We are no longer receiving any more data frames: + body.finish - # puts "Streaming body..." - response.body.each do |chunk| - # puts "Sending chunk #{chunk.inspect}" - stream.data(chunk, end_stream: false) + # Generate the response: + response = yield request + + headers = {STATUS => response.status.to_s} + headers.update(response.headers) + + if response.body.nil? or response.body.empty? + stream.headers(headers, end_stream: true) + response.body.read if response.body + else + stream.headers(headers, end_stream: false) + + response.body.each do |chunk| + stream.data(chunk, end_stream: false) + end + + stream.data("", end_stream: true) end + rescue + Async.logger.error(self) {$!} - # puts "Ending stream..." - stream.data("", end_stream: true) + # Generating the response failed. + stream.close(:internal_error) end end end start_connection @reader.wait end def call(request) - @count += 1 - request.version ||= self.version stream = @controller.new_stream + @count += 1 headers = { SCHEME => HTTPS, METHOD => request.method.to_s, PATH => request.path.to_s, AUTHORITY => request.authority.to_s, }.merge(request.headers) finished = Async::Notification.new + exception = nil response = Response.new response.version = self.version response.headers = {} body = Body::Writable.new response.body = body @@ -202,26 +215,44 @@ else response.headers[key] = value end end + # At this point, we are now expecting two events: data and close. + stream.on(:close) do |error| + # If we receive close after this point, it's not a request error, but a failure we need to signal to the body. + if error + body.stop(EOFError.new(error)) + else + body.finish + end + end + finished.signal end stream.on(:data) do |chunk| body.write(chunk.to_s) unless chunk.empty? end - stream.on(:close) do - body.finish + stream.on(:close) do |error| + # The remote server has closed the connection while we were sending the request. + if error + exception = EOFError.new(error) + finished.signal + end end if request.body.nil? or request.body.empty? stream.headers(headers, end_stream: true) request.body.read if request.body else - stream.headers(headers, end_stream: false) + begin + stream.headers(headers, end_stream: false) + rescue + raise RequestFailed.new + end request.body.each do |chunk| stream.data(chunk, end_stream: false) end @@ -229,13 +260,17 @@ end start_connection @stream.flush - # Async.logger.debug(self) {"Stream flushed, waiting for signal."} + Async.logger.debug(self) {"Stream flushed, waiting for signal."} finished.wait - # Async.logger.debug(self) {"Stream finished: #{response.inspect}"} + if exception + raise exception + end + + Async.logger.debug(self) {"Stream finished: #{response.inspect}"} return response end end end end