lib/async/http/protocol/http2/stream.rb in async-http-0.45.4 vs lib/async/http/protocol/http2/stream.rb in async-http-0.45.6
- old
+ new
@@ -23,139 +23,193 @@
module Async
module HTTP
module Protocol
module HTTP2
class Stream < ::Protocol::HTTP2::Stream
- def initialize(delegate, *args)
- super(*args)
+ class Buffer
+ def initialize(stream, body, task: Task.current)
+ @stream = stream
+
+ @body = body
+ @remainder = nil
+
+ @window_updated = Async::Condition.new
+
+ @task = task.async(&self.method(:passthrough))
+ end
- @delegate = delegate
+ def passthrough(task)
+ while chunk = self.read
+ maximum_size = @stream.available_frame_size
+
+ while maximum_size <= 0
+ @window_updated.wait
+
+ maximum_size = @stream.available_frame_size
+ end
+
+ self.send_data(chunk, maximum_size)
+ end
+
+ self.end_stream
+ rescue Async::Stop
+ # Ignore.
+ ensure
+ @body&.close($!)
+ @body = nil
+ end
- # This is the body that is being sent.
- @body = nil
+ def read
+ if @remainder
+ remainder = @remainder
+ @remainder = nil
+
+ return remainder
+ else
+ @body.read
+ end
+ end
- # The remainder of the current chunk being sent.
- @remainder = nil
+ def push(chunk)
+ @remainder = chunk
+ end
- # The task that is handling sending the body.
- @task = nil
- end
-
- attr_accessor :delegate
- attr :body
-
- def create_push_promise_stream(headers)
- @delegate.create_push_promise_stream(headers)
- end
-
- def accept_push_promise_stream(headers, stream_id)
- @delegate.accept_push_promise_stream(headers, stream_id)
- end
-
- def send_body(body, task: Async::Task.current)
- # TODO Might need to stop this task when body is cancelled.
- @task = task.async do |subtask|
- subtask.annotate "Sending body: #{body.class}"
-
- @body = body
-
- window_updated
+ # Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.
+ # @param maximum_size [Integer] send up to this many bytes of data.
+ # @param stream [Stream] the stream to use for sending data frames.
+ def send_data(chunk, maximum_size)
+ if chunk.bytesize <= maximum_size
+ @stream.send_data(chunk, maximum_size: maximum_size)
+ else
+ @stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size)
+
+ # The window was not big enough to send all the data, so we save it for next time:
+ self.push(
+ chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
+ )
+ end
end
- end
-
- def send_chunk
- maximum_size = self.available_frame_size
- if maximum_size == 0
- return false
+ def end_stream
+ @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM)
end
- if @remainder
- chunk = @remainder
- @remainder = nil
- elsif chunk = @body.read
- # There was a new chunk of data to send
- else
+ def window_updated(size)
+ @window_updated.signal
+ end
+
+ def close(error)
if @body
- @body.close
+ @body.close(error)
@body = nil
end
- # @body.read above might take a while and a stream reset might be received in the mean time.
- unless closed? or @connection.closed?
- send_data(nil, ::Protocol::HTTP2::END_STREAM)
- end
-
- return false
+ @task&.stop
end
+ end
+
+ def initialize(*)
+ super
- return false if closed?
+ @headers = nil
+ @trailers = nil
- if chunk.bytesize <= maximum_size
- send_data(chunk, maximum_size: maximum_size)
+ # Input buffer (receive_data):
+ @length = nil
+ @input = nil
+
+ # Output buffer (window_updated):
+ @output = nil
+ end
+
+ attr_accessor :headers
+
+ def add_header(key, value)
+ if key == CONNECTION
+ raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!"
+ elsif key.start_with? ':'
+ raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!"
+ elsif key =~ /[A-Z]/
+ raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!"
else
- send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size)
-
- @remainder = chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
+ @headers.add(key, value)
end
-
- return true
end
- def window_updated
- return unless @body
-
- while send_chunk
- # There could be more data to send...
+ def add_trailer(key, value)
+ if @trailers.include(key)
+ add_header(key, value)
+ else
+ raise ::Protocol::HTTP2::HeaderError, "Cannot add trailer #{key} as it was not specified in trailers!"
end
end
+ def receive_trailing_headers(headers, end_stream)
+ headers.each do |key, value|
+ add_trailer(key, value)
+ end
+ end
+
def receive_headers(frame)
- headers = super
+ if @headers.nil?
+ @headers = ::Protocol::HTTP::Headers.new
+ self.receive_initial_headers(super, frame.end_stream?)
+ @trailers = @headers[TRAILERS]
+ elsif @trailers and frame.end_stream?
+ self.receive_trailing_headers(super, frame.end_stream?)
+ else
+ raise ::Protocol::HTTP2::HeaderError, "Unable to process headers!"
+ end
+ rescue ::Protocol::HTTP2::HeaderError => error
+ Async.logger.error(self, error)
- @delegate.receive_headers(self, headers, frame.end_stream?)
-
- return headers
+ send_reset_stream(error.code)
end
- def receive_data(frame)
- data = super
+ def process_data(frame)
+ data = frame.unpack
- if data
- @delegate.receive_data(self, data, frame.end_stream?)
+ if @input
+ unless data.empty?
+ @input.write(data)
+ end
+
+ if frame.end_stream?
+ @input.close
+ @input = nil
+ end
end
return data
+ rescue ::Protocol::HTTP2::ProtocolError
+ raise
+ rescue # Anything else...
+ send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR)
end
- def receive_reset_stream(frame)
- error_code = super
-
- if @body
- @body.close(EOFError.new(error_code))
- @body = nil
- end
-
- @delegate.receive_reset_stream(self, error_code)
-
- return error_code
+ # Set the body and begin sending it.
+ def send_body(body)
+ @output = Buffer.new(self, body)
end
- def close!
- @delegate.close!
-
+ def window_updated(size)
super
+
+ @output&.window_updated(size)
end
def close(error = nil)
super
- if @body
- @body.close(error)
- @body = nil
+ if @input
+ @input.close(error)
+ @input = nil
end
- @delegate.stream_closed(error)
+ if @output
+ @output.close(error)
+ @output = nil
+ end
end
end
end
end
end