lib/async/http/protocol/http2/stream.rb in async-http-0.46.4 vs lib/async/http/protocol/http2/stream.rb in async-http-0.46.5
- old
+ new
@@ -42,89 +42,115 @@
return chunk
end
end
class Output
- def initialize(stream, body, task: Task.current)
- @stream = stream
+ def self.for(stream, body)
+ output = self.new(stream, body)
+ output.start
+
+ return output
+ end
+
+ def initialize(stream, body)
+ @stream = stream
@body = body
- @remainder = nil
@window_updated = Async::Condition.new
+ end
+
+ def start(parent: Task.current)
+ if @body.respond_to?(:call)
+ @task = parent.async(&self.method(:stream))
+ else
+ @task = parent.async(&self.method(:passthrough))
+ end
+ end
+
+ def stop(error)
+ # Ensure that invoking #close doesn't try to close the stream.
+ @stream = nil
- @task = task.async(&self.method(:passthrough))
+ @task&.stop
end
- # Reads chunks from the given body and writes them to the stream as fast as possible.
- def passthrough(task)
- while chunk = self.read
+ def write(chunk)
+ until chunk.empty?
maximum_size = @stream.available_frame_size
while maximum_size <= 0
@window_updated.wait
maximum_size = @stream.available_frame_size
end
- self.send_data(chunk, maximum_size)
+ break unless chunk = send_data(chunk, maximum_size)
end
-
- self.end_stream
- rescue Async::Stop
- # Ignore.
- ensure
- @body&.close($!)
- @body = nil
end
- def read
- if @remainder
- remainder = @remainder
- @remainder = nil
+ def window_updated(size)
+ @window_updated.signal
+ end
+
+ def close(error = nil)
+ if @stream
+ if error
+ @stream.close(error)
+ else
+ self.close_write
+ end
- return remainder
- else
- @body&.read
+ @stream = nil
end
end
- def push(chunk)
- @remainder = chunk
+ def close_write
+ @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM)
end
+ private
+
+ def stream(task)
+ task.annotate("Streaming #{@body} to #{@stream}.")
+
+ @body.call(Body::Stream.new(@stream.input, self))
+ rescue Async::Stop
+ # Ignore.
+ end
+
+ # Reads chunks from the given body and writes them to the stream as fast as possible.
+ def passthrough(task)
+ task.annotate("Writing #{@body} to #{@stream}.")
+
+ while chunk = @body&.read
+ self.write(chunk)
+ end
+
+ self.close_write
+ rescue Async::Stop
+ # Ignore.
+ ensure
+ @body&.close($!)
+ @body = nil
+ end
+
# Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.
# @param maximum_size [Integer] send up to this many bytes of data.
# @param stream [Stream] the stream to use for sending data frames.
+ # @return [String, nil] any data that could not be written.
def send_data(chunk, maximum_size)
if chunk.bytesize <= maximum_size
@stream.send_data(chunk, maximum_size: maximum_size)
else
@stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size)
# The window was not big enough to send all the data, so we save it for next time:
- self.push(
- chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
- )
+ return chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
end
- end
-
- def end_stream
- @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM)
- end
-
- def window_updated(size)
- @window_updated.signal
- end
-
- def close(error)
- if @body
- @body.close(error)
- @body = nil
- end
- @task&.stop
+ return nil
end
end
def initialize(*)
super
@@ -140,10 +166,12 @@
@output = nil
end
attr_accessor :headers
+ attr :input
+
def add_header(key, value)
if key == CONNECTION
raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!"
elsif key.start_with? ':'
raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!"
@@ -220,11 +248,11 @@
send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR)
end
# Set the body and begin sending it.
def send_body(body)
- @output = Output.new(self, body)
+ @output = Output.for(self, body)
end
def window_updated(size)
super
@@ -238,10 +266,10 @@
@input.close(error)
@input = nil
end
if @output
- @output.close(error)
+ @output.stop(error)
@output = nil
end
end
end
end