lib/async/http/protocol/http2/stream.rb in async-http-0.50.9 vs lib/async/http/protocol/http2/stream.rb in async-http-0.50.10

- old
+ new

@@ -19,161 +19,19 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. require 'protocol/http2/stream' -require_relative '../../body/writable' +require_relative 'input' +require_relative 'output' + module Async module HTTP module Protocol module HTTP2 class Stream < ::Protocol::HTTP2::Stream - # A writable body which requests window updates when data is read from it. - class Input < Body::Writable - def initialize(stream, length) - super(length) - - @stream = stream - @remaining = length - end - - def read - if chunk = super - # If we read a chunk fron the stream, we want to extend the window if required so more data will be provided. - @stream.request_window_update - end - - # We track the expected length and check we got what we were expecting. - if @remaining - if chunk - @remaining -= chunk.bytesize - elsif @remaining > 0 - raise EOFError, "Expected #{self.length} bytes, #{@remaining} bytes short!" - elsif @remaining < 0 - raise EOFError, "Expected #{self.length} bytes, #{@remaining} bytes over!" - end - end - - return chunk - end - end - - class Output - def self.for(stream, body) - output = self.new(stream, body) - - output.start - - return output - end - - def initialize(stream, body) - @stream = stream - @body = body - - @window_updated = Async::Condition.new - end - - def start(parent: Task.current) - if @body.respond_to?(:call) - @task = parent.async(&self.method(:stream)) - else - @task = parent.async(&self.method(:passthrough)) - end - end - - def stop(error) - # Ensure that invoking #close doesn't try to close the stream. - @stream = nil - - @task&.stop - end - - def write(chunk) - until chunk.empty? - maximum_size = @stream.available_frame_size - - while maximum_size <= 0 - @window_updated.wait - - maximum_size = @stream.available_frame_size - end - - break unless chunk = send_data(chunk, maximum_size) - end - end - - def window_updated(size) - @window_updated.signal - end - - def close(error = nil) - if @stream - if error - @stream.close(error) - else - self.close_write - end - - @stream = nil - end - end - - def close_write - @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM) - end - - private - - def stream(task) - task.annotate("Streaming #{@body} to #{@stream}.") - - input = @stream.wait_for_input - - @body.call(Body::Stream.new(input, self)) - rescue Async::Stop - # Ignore. - end - - # Reads chunks from the given body and writes them to the stream as fast as possible. - def passthrough(task) - task.annotate("Writing #{@body} to #{@stream}.") - - while chunk = @body&.read - self.write(chunk) - # TODO this reduces memory usage? - # chunk.clear unless chunk.frozen? - # GC.start - end - - self.close_write - rescue Async::Stop - # Ignore. - ensure - @body&.close($!) - @body = nil - end - - # Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk. - # @param maximum_size [Integer] send up to this many bytes of data. - # @param stream [Stream] the stream to use for sending data frames. - # @return [String, nil] any data that could not be written. - def send_data(chunk, maximum_size) - if chunk.bytesize <= maximum_size - @stream.send_data(chunk, maximum_size: maximum_size) - else - @stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size) - - # The window was not big enough to send all the data, so we save it for next time: - return chunk.byteslice(maximum_size, chunk.bytesize - maximum_size) - end - - return nil - end - end - def initialize(*) super @headers = nil @trailers = nil @@ -277,17 +135,32 @@ # Set the body and begin sending it. def send_body(body) @output = Output.for(self, body) end + # Called when the output terminates normally. + def finish_output(error = nil) + @output = nil + + if error + send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) + else + send_data(nil, ::Protocol::HTTP2::END_STREAM) + end + end + def window_updated(size) super @output&.window_updated(size) end - def close(error = nil) + # When the stream transitions to the closed state, this method is called. There are roughly two ways this can happen: + # - A frame is received which causes this stream to enter the closed state. This method will be invoked from the background reader task. + # - A frame is sent which causes this stream to enter the closed state. This method will be invoked from that task. + # While the input stream is relatively straight forward, the output stream can trigger the second case above + def closed(error) super if @input @input.close(error) @input = nil @@ -295,9 +168,11 @@ if @output @output.stop(error) @output = nil end + + return self end end end end end