lib/async/http/protocol/http2/stream.rb in async-http-0.46.2 vs lib/async/http/protocol/http2/stream.rb in async-http-0.46.3

- old
+ new

@@ -17,17 +17,35 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. require 'protocol/http2/stream' +require_relative '../../body/writable' module Async module HTTP module Protocol module HTTP2 class Stream < ::Protocol::HTTP2::Stream - class Buffer + class Input < Body::Writable + def initialize(stream, length) + super(length) + + @stream = stream + end + + def read + if chunk = super + # If we read a chunk fron the stream, we want to extend the window if required so more data will be provided. + @stream.request_window_update + end + + return chunk + end + end + + class Output def initialize(stream, body, task: Task.current) @stream = stream @body = body @remainder = nil @@ -35,10 +53,11 @@ @window_updated = Async::Condition.new @task = task.async(&self.method(:passthrough)) end + # Reads chunks from the given body and writes them to the stream as fast as possible. def passthrough(task) while chunk = self.read maximum_size = @stream.available_frame_size while maximum_size <= 0 @@ -111,15 +130,15 @@ super @headers = nil @trailers = nil - # Input buffer (receive_data): + # Input buffer, reading request body, or response body (receive_data): @length = nil @input = nil - # Output buffer (window_updated): + # Output buffer, writing request body or response body (window_updated): @output = nil end attr_accessor :headers @@ -163,10 +182,25 @@ Async.logger.error(self, error) send_reset_stream(error.code) end + def prepare_input(length) + if @input.nil? + @input = Input.new(self, length) + else + raise ArgumentError, "Input body already prepared!" + end + end + + def update_local_window(frame) + consume_local_window(frame) + + # This is done on demand. + # request_window_update + end + def process_data(frame) data = frame.unpack if @input unless data.empty? @@ -186,10 +220,10 @@ send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) end # Set the body and begin sending it. def send_body(body) - @output = Buffer.new(self, body) + @output = Output.new(self, body) end def window_updated(size) super