lib/async/http/protocol/http2/stream.rb in async-http-0.50.9 vs lib/async/http/protocol/http2/stream.rb in async-http-0.50.10
- old
+ new
@@ -19,161 +19,19 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
require 'protocol/http2/stream'
-require_relative '../../body/writable'
+require_relative 'input'
+require_relative 'output'
+
module Async
module HTTP
module Protocol
module HTTP2
class Stream < ::Protocol::HTTP2::Stream
- # A writable body which requests window updates when data is read from it.
- class Input < Body::Writable
- def initialize(stream, length)
- super(length)
-
- @stream = stream
- @remaining = length
- end
-
- def read
- if chunk = super
- # If we read a chunk fron the stream, we want to extend the window if required so more data will be provided.
- @stream.request_window_update
- end
-
- # We track the expected length and check we got what we were expecting.
- if @remaining
- if chunk
- @remaining -= chunk.bytesize
- elsif @remaining > 0
- raise EOFError, "Expected #{self.length} bytes, #{@remaining} bytes short!"
- elsif @remaining < 0
- raise EOFError, "Expected #{self.length} bytes, #{@remaining} bytes over!"
- end
- end
-
- return chunk
- end
- end
-
- class Output
- def self.for(stream, body)
- output = self.new(stream, body)
-
- output.start
-
- return output
- end
-
- def initialize(stream, body)
- @stream = stream
- @body = body
-
- @window_updated = Async::Condition.new
- end
-
- def start(parent: Task.current)
- if @body.respond_to?(:call)
- @task = parent.async(&self.method(:stream))
- else
- @task = parent.async(&self.method(:passthrough))
- end
- end
-
- def stop(error)
- # Ensure that invoking #close doesn't try to close the stream.
- @stream = nil
-
- @task&.stop
- end
-
- def write(chunk)
- until chunk.empty?
- maximum_size = @stream.available_frame_size
-
- while maximum_size <= 0
- @window_updated.wait
-
- maximum_size = @stream.available_frame_size
- end
-
- break unless chunk = send_data(chunk, maximum_size)
- end
- end
-
- def window_updated(size)
- @window_updated.signal
- end
-
- def close(error = nil)
- if @stream
- if error
- @stream.close(error)
- else
- self.close_write
- end
-
- @stream = nil
- end
- end
-
- def close_write
- @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM)
- end
-
- private
-
- def stream(task)
- task.annotate("Streaming #{@body} to #{@stream}.")
-
- input = @stream.wait_for_input
-
- @body.call(Body::Stream.new(input, self))
- rescue Async::Stop
- # Ignore.
- end
-
- # Reads chunks from the given body and writes them to the stream as fast as possible.
- def passthrough(task)
- task.annotate("Writing #{@body} to #{@stream}.")
-
- while chunk = @body&.read
- self.write(chunk)
- # TODO this reduces memory usage?
- # chunk.clear unless chunk.frozen?
- # GC.start
- end
-
- self.close_write
- rescue Async::Stop
- # Ignore.
- ensure
- @body&.close($!)
- @body = nil
- end
-
- # Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.
- # @param maximum_size [Integer] send up to this many bytes of data.
- # @param stream [Stream] the stream to use for sending data frames.
- # @return [String, nil] any data that could not be written.
- def send_data(chunk, maximum_size)
- if chunk.bytesize <= maximum_size
- @stream.send_data(chunk, maximum_size: maximum_size)
- else
- @stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size)
-
- # The window was not big enough to send all the data, so we save it for next time:
- return chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
- end
-
- return nil
- end
- end
-
def initialize(*)
super
@headers = nil
@trailers = nil
@@ -277,17 +135,32 @@
# Set the body and begin sending it.
def send_body(body)
@output = Output.for(self, body)
end
+ # Called when the output terminates normally.
+ def finish_output(error = nil)
+ @output = nil
+
+ if error
+ send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR)
+ else
+ send_data(nil, ::Protocol::HTTP2::END_STREAM)
+ end
+ end
+
def window_updated(size)
super
@output&.window_updated(size)
end
- def close(error = nil)
+ # When the stream transitions to the closed state, this method is called. There are roughly two ways this can happen:
+ # - A frame is received which causes this stream to enter the closed state. This method will be invoked from the background reader task.
+ # - A frame is sent which causes this stream to enter the closed state. This method will be invoked from that task.
+ # While the input stream is relatively straight forward, the output stream can trigger the second case above
+ def closed(error)
super
if @input
@input.close(error)
@input = nil
@@ -295,9 +168,11 @@
if @output
@output.stop(error)
@output = nil
end
+
+ return self
end
end
end
end
end