lib/async/http/protocol/http2/stream.rb in async-http-0.45.4 vs lib/async/http/protocol/http2/stream.rb in async-http-0.45.6

- old
+ new

@@ -23,139 +23,193 @@ module Async module HTTP module Protocol module HTTP2 class Stream < ::Protocol::HTTP2::Stream - def initialize(delegate, *args) - super(*args) + class Buffer + def initialize(stream, body, task: Task.current) + @stream = stream + + @body = body + @remainder = nil + + @window_updated = Async::Condition.new + + @task = task.async(&self.method(:passthrough)) + end - @delegate = delegate + def passthrough(task) + while chunk = self.read + maximum_size = @stream.available_frame_size + + while maximum_size <= 0 + @window_updated.wait + + maximum_size = @stream.available_frame_size + end + + self.send_data(chunk, maximum_size) + end + + self.end_stream + rescue Async::Stop + # Ignore. + ensure + @body&.close($!) + @body = nil + end - # This is the body that is being sent. - @body = nil + def read + if @remainder + remainder = @remainder + @remainder = nil + + return remainder + else + @body.read + end + end - # The remainder of the current chunk being sent. - @remainder = nil + def push(chunk) + @remainder = chunk + end - # The task that is handling sending the body. - @task = nil - end - - attr_accessor :delegate - attr :body - - def create_push_promise_stream(headers) - @delegate.create_push_promise_stream(headers) - end - - def accept_push_promise_stream(headers, stream_id) - @delegate.accept_push_promise_stream(headers, stream_id) - end - - def send_body(body, task: Async::Task.current) - # TODO Might need to stop this task when body is cancelled. - @task = task.async do |subtask| - subtask.annotate "Sending body: #{body.class}" - - @body = body - - window_updated + # Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk. + # @param maximum_size [Integer] send up to this many bytes of data. + # @param stream [Stream] the stream to use for sending data frames. + def send_data(chunk, maximum_size) + if chunk.bytesize <= maximum_size + @stream.send_data(chunk, maximum_size: maximum_size) + else + @stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size) + + # The window was not big enough to send all the data, so we save it for next time: + self.push( + chunk.byteslice(maximum_size, chunk.bytesize - maximum_size) + ) + end end - end - - def send_chunk - maximum_size = self.available_frame_size - if maximum_size == 0 - return false + def end_stream + @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM) end - if @remainder - chunk = @remainder - @remainder = nil - elsif chunk = @body.read - # There was a new chunk of data to send - else + def window_updated(size) + @window_updated.signal + end + + def close(error) if @body - @body.close + @body.close(error) @body = nil end - # @body.read above might take a while and a stream reset might be received in the mean time. - unless closed? or @connection.closed? - send_data(nil, ::Protocol::HTTP2::END_STREAM) - end - - return false + @task&.stop end + end + + def initialize(*) + super - return false if closed? + @headers = nil + @trailers = nil - if chunk.bytesize <= maximum_size - send_data(chunk, maximum_size: maximum_size) + # Input buffer (receive_data): + @length = nil + @input = nil + + # Output buffer (window_updated): + @output = nil + end + + attr_accessor :headers + + def add_header(key, value) + if key == CONNECTION + raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!" + elsif key.start_with? ':' + raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!" + elsif key =~ /[A-Z]/ + raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!" else - send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size) - - @remainder = chunk.byteslice(maximum_size, chunk.bytesize - maximum_size) + @headers.add(key, value) end - - return true end - def window_updated - return unless @body - - while send_chunk - # There could be more data to send... + def add_trailer(key, value) + if @trailers.include(key) + add_header(key, value) + else + raise ::Protocol::HTTP2::HeaderError, "Cannot add trailer #{key} as it was not specified in trailers!" end end + def receive_trailing_headers(headers, end_stream) + headers.each do |key, value| + add_trailer(key, value) + end + end + def receive_headers(frame) - headers = super + if @headers.nil? + @headers = ::Protocol::HTTP::Headers.new + self.receive_initial_headers(super, frame.end_stream?) + @trailers = @headers[TRAILERS] + elsif @trailers and frame.end_stream? + self.receive_trailing_headers(super, frame.end_stream?) + else + raise ::Protocol::HTTP2::HeaderError, "Unable to process headers!" + end + rescue ::Protocol::HTTP2::HeaderError => error + Async.logger.error(self, error) - @delegate.receive_headers(self, headers, frame.end_stream?) - - return headers + send_reset_stream(error.code) end - def receive_data(frame) - data = super + def process_data(frame) + data = frame.unpack - if data - @delegate.receive_data(self, data, frame.end_stream?) + if @input + unless data.empty? + @input.write(data) + end + + if frame.end_stream? + @input.close + @input = nil + end end return data + rescue ::Protocol::HTTP2::ProtocolError + raise + rescue # Anything else... + send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) end - def receive_reset_stream(frame) - error_code = super - - if @body - @body.close(EOFError.new(error_code)) - @body = nil - end - - @delegate.receive_reset_stream(self, error_code) - - return error_code + # Set the body and begin sending it. + def send_body(body) + @output = Buffer.new(self, body) end - def close! - @delegate.close! - + def window_updated(size) super + + @output&.window_updated(size) end def close(error = nil) super - if @body - @body.close(error) - @body = nil + if @input + @input.close(error) + @input = nil end - @delegate.stream_closed(error) + if @output + @output.close(error) + @output = nil + end end end end end end