lib/async/http/protocol/http2/stream.rb in async-http-0.46.2 vs lib/async/http/protocol/http2/stream.rb in async-http-0.46.3
- old
+ new
@@ -17,17 +17,35 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
require 'protocol/http2/stream'
+require_relative '../../body/writable'
module Async
module HTTP
module Protocol
module HTTP2
class Stream < ::Protocol::HTTP2::Stream
- class Buffer
+ class Input < Body::Writable
+ def initialize(stream, length)
+ super(length)
+
+ @stream = stream
+ end
+
+ def read
+ if chunk = super
+ # If we read a chunk fron the stream, we want to extend the window if required so more data will be provided.
+ @stream.request_window_update
+ end
+
+ return chunk
+ end
+ end
+
+ class Output
def initialize(stream, body, task: Task.current)
@stream = stream
@body = body
@remainder = nil
@@ -35,10 +53,11 @@
@window_updated = Async::Condition.new
@task = task.async(&self.method(:passthrough))
end
+ # Reads chunks from the given body and writes them to the stream as fast as possible.
def passthrough(task)
while chunk = self.read
maximum_size = @stream.available_frame_size
while maximum_size <= 0
@@ -111,15 +130,15 @@
super
@headers = nil
@trailers = nil
- # Input buffer (receive_data):
+ # Input buffer, reading request body, or response body (receive_data):
@length = nil
@input = nil
- # Output buffer (window_updated):
+ # Output buffer, writing request body or response body (window_updated):
@output = nil
end
attr_accessor :headers
@@ -163,10 +182,25 @@
Async.logger.error(self, error)
send_reset_stream(error.code)
end
+ def prepare_input(length)
+ if @input.nil?
+ @input = Input.new(self, length)
+ else
+ raise ArgumentError, "Input body already prepared!"
+ end
+ end
+
+ def update_local_window(frame)
+ consume_local_window(frame)
+
+ # This is done on demand.
+ # request_window_update
+ end
+
def process_data(frame)
data = frame.unpack
if @input
unless data.empty?
@@ -186,10 +220,10 @@
send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR)
end
# Set the body and begin sending it.
def send_body(body)
- @output = Buffer.new(self, body)
+ @output = Output.new(self, body)
end
def window_updated(size)
super