lib/async/http/protocol/http2/stream.rb in async-http-0.46.4 vs lib/async/http/protocol/http2/stream.rb in async-http-0.46.5

- old
+ new

@@ -42,89 +42,115 @@ return chunk end end class Output - def initialize(stream, body, task: Task.current) - @stream = stream + def self.for(stream, body) + output = self.new(stream, body) + output.start + + return output + end + + def initialize(stream, body) + @stream = stream @body = body - @remainder = nil @window_updated = Async::Condition.new + end + + def start(parent: Task.current) + if @body.respond_to?(:call) + @task = parent.async(&self.method(:stream)) + else + @task = parent.async(&self.method(:passthrough)) + end + end + + def stop(error) + # Ensure that invoking #close doesn't try to close the stream. + @stream = nil - @task = task.async(&self.method(:passthrough)) + @task&.stop end - # Reads chunks from the given body and writes them to the stream as fast as possible. - def passthrough(task) - while chunk = self.read + def write(chunk) + until chunk.empty? maximum_size = @stream.available_frame_size while maximum_size <= 0 @window_updated.wait maximum_size = @stream.available_frame_size end - self.send_data(chunk, maximum_size) + break unless chunk = send_data(chunk, maximum_size) end - - self.end_stream - rescue Async::Stop - # Ignore. - ensure - @body&.close($!) - @body = nil end - def read - if @remainder - remainder = @remainder - @remainder = nil + def window_updated(size) + @window_updated.signal + end + + def close(error = nil) + if @stream + if error + @stream.close(error) + else + self.close_write + end - return remainder - else - @body&.read + @stream = nil end end - def push(chunk) - @remainder = chunk + def close_write + @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM) end + private + + def stream(task) + task.annotate("Streaming #{@body} to #{@stream}.") + + @body.call(Body::Stream.new(@stream.input, self)) + rescue Async::Stop + # Ignore. + end + + # Reads chunks from the given body and writes them to the stream as fast as possible. + def passthrough(task) + task.annotate("Writing #{@body} to #{@stream}.") + + while chunk = @body&.read + self.write(chunk) + end + + self.close_write + rescue Async::Stop + # Ignore. + ensure + @body&.close($!) + @body = nil + end + # Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk. # @param maximum_size [Integer] send up to this many bytes of data. # @param stream [Stream] the stream to use for sending data frames. + # @return [String, nil] any data that could not be written. def send_data(chunk, maximum_size) if chunk.bytesize <= maximum_size @stream.send_data(chunk, maximum_size: maximum_size) else @stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size) # The window was not big enough to send all the data, so we save it for next time: - self.push( - chunk.byteslice(maximum_size, chunk.bytesize - maximum_size) - ) + return chunk.byteslice(maximum_size, chunk.bytesize - maximum_size) end - end - - def end_stream - @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM) - end - - def window_updated(size) - @window_updated.signal - end - - def close(error) - if @body - @body.close(error) - @body = nil - end - @task&.stop + return nil end end def initialize(*) super @@ -140,10 +166,12 @@ @output = nil end attr_accessor :headers + attr :input + def add_header(key, value) if key == CONNECTION raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!" elsif key.start_with? ':' raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!" @@ -220,11 +248,11 @@ send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) end # Set the body and begin sending it. def send_body(body) - @output = Output.new(self, body) + @output = Output.for(self, body) end def window_updated(size) super @@ -238,10 +266,10 @@ @input.close(error) @input = nil end if @output - @output.close(error) + @output.stop(error) @output = nil end end end end