lib/http/2/stream.rb in http-2-0.12.0 vs lib/http/2/stream.rb in http-2-1.0.0
- old
+ new
@@ -72,23 +72,28 @@
# @param exclusive [Boolean]
# @param window [Integer]
# @param parent [Stream]
# @param state [Symbol]
def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle)
+ stream_error(:protocol_error, msg: "stream can't depend on itself") if id == dependency
+
@connection = connection
@id = id
@weight = weight
@dependency = dependency
- process_priority(weight: weight, stream_dependency: dependency, exclusive: exclusive)
+ process_priority(weight: weight, dependency: dependency, exclusive: exclusive)
@local_window_max_size = connection.local_settings[:settings_initial_window_size]
- @local_window = connection.local_settings[:settings_initial_window_size]
+ @local_window = connection.local_settings[:settings_initial_window_size]
@remote_window = connection.remote_settings[:settings_initial_window_size]
@parent = parent
@state = state
@error = false
@closed = false
- @send_buffer = []
+ @_method = @_content_length = @_status_code = nil
+ @_waiting_on_trailers = false
+ @received_data = false
+ @activated = false
on(:window) { |v| @remote_window = v }
on(:local_window) { |v| @local_window_max_size = @local_window = v }
end
@@ -102,22 +107,50 @@
def receive(frame)
transition(frame, false)
case frame[:type]
when :data
+ # 6.1. DATA
+ # If a DATA frame is received whose stream is not in "open" or
+ # "half closed (local)" state, the recipient MUST respond with a
+ # stream error (Section 5.4.2) of type STREAM_CLOSED.
+ stream_error(:stream_closed) unless @state == :open ||
+ @state == :half_closed_local ||
+ @state == :half_closing || @state == :closing ||
+ (@state == :closed && @closed == :local_rst)
+ @received_data = true
+ calculate_content_length(frame[:length])
update_local_window(frame)
# Emit DATA frame
emit(:data, frame[:payload]) unless frame[:ignore]
calculate_window_update(@local_window_max_size)
when :headers
+ stream_error(:stream_closed) if (@state == :closed && @closed != :local_rst) ||
+ @state == :remote_closed
+ @_method ||= frame[:method]
+ @_status_code ||= frame[:status]
+ @_content_length ||= frame[:content_length]
+ @_trailers ||= frame[:trailer]
+ if @_waiting_on_trailers ||
+ (@received_data &&
+ (!@_status_code || @_status_code >= 200))
+
+ # An endpoint that receives a HEADERS frame without the END_STREAM flag set after receiving a final
+ # (non-informational) status code MUST treat the corresponding request or response as malformed.
+ verify_trailers(frame)
+ end
emit(:headers, frame[:payload]) unless frame[:ignore]
+ @_waiting_on_trailers = !@_trailers.nil?
when :push_promise
emit(:promise_headers, frame[:payload]) unless frame[:ignore]
+ when :continuation
+ stream_error(:stream_closed) if (@state == :closed && @closed != :local_rst) || @state == :remote_closed
+ stream_error(:protocol_error) if @received_data
when :priority
process_priority(frame)
when :window_update
- process_window_update(frame)
+ process_window_update(frame: frame)
when :altsvc
# 4. The ALTSVC HTTP/2 Frame
# An ALTSVC frame on a
# stream other than stream 0 containing non-empty "Origin" information
# is invalid and MUST be ignored.
@@ -128,10 +161,33 @@
complete_transition(frame)
end
alias << receive
+ def verify_trailers(frame)
+ stream_error(:protocol_error, msg: "trailer headers frame must close the stream") unless end_stream?(frame)
+ return unless @_trailers
+
+ trailers = frame[:payload]
+ return unless trailers.respond_to?(:each)
+
+ trailers.each do |field, _| # rubocop:disable Style/HashEachMethods
+ @_trailers.delete(field)
+ break if @_trailers.empty?
+ end
+ stream_error(:protocol_error, msg: "didn't receive all expected trailer headers") unless @_trailers.empty?
+ end
+
+ def calculate_content_length(data_length)
+ return unless @_content_length && data_length
+
+ @_content_length -= data_length
+ return if @_content_length >= 0
+
+ stream_error(:protocol_error, msg: "received more data than what was defined in content-length")
+ end
+
# Processes outgoing HTTP 2.0 frames. Data frames may be automatically
# split and buffered based on maximum frame size and current stream flow
# control window size.
#
# @param frame [Hash]
@@ -161,17 +217,17 @@
# @param end_headers [Boolean] indicates that no more headers will be sent
# @param end_stream [Boolean] indicates that no payload will be sent
def headers(headers, end_headers: true, end_stream: false)
flags = []
flags << :end_headers if end_headers
- flags << :end_stream if end_stream
+ flags << :end_stream if end_stream || @_method == "HEAD"
send(type: :headers, flags: flags, payload: headers)
end
def promise(headers, end_headers: true, &block)
- raise ArgumentError, 'must provide callback' unless block_given?
+ raise ArgumentError, "must provide callback" unless block
flags = end_headers ? [:end_headers] : []
emit(:promise, self, headers, flags, &block)
end
@@ -180,11 +236,11 @@
#
# @param weight [Integer] new stream weight value
# @param dependency [Integer] new stream dependency stream
def reprioritize(weight: 16, dependency: 0, exclusive: false)
stream_error if @id.even?
- send(type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive)
+ send(type: :priority, weight: weight, dependency: dependency, exclusive: exclusive)
end
# Sends DATA frame containing response payload.
#
# @param payload [String]
@@ -548,47 +604,61 @@
def event(newstate)
case newstate
when :open
@state = newstate
- emit(:active)
+ activate_stream_in_conn
when :reserved_local, :reserved_remote
@state = newstate
emit(:reserved)
when :half_closed_local, :half_closed_remote
@closed = newstate
- emit(:active) unless @state == :open
+ activate_stream_in_conn unless @state == :open
@state = :half_closing
when :local_closed, :remote_closed, :local_rst, :remote_rst
@closed = newstate
@state = :closing
end
@state
end
+ # Streams that are in the "open" state, or either of the "half closed"
+ # states count toward the maximum number of streams that an endpoint is
+ # permitted to open.
+ def activate_stream_in_conn
+ @connection.active_stream_count += 1
+ @activated = true
+ emit(:active)
+ end
+
+ def close_stream_in_conn(*args)
+ @connection.active_stream_count -= 1 if @activated
+ emit(:close, *args)
+ end
+
def complete_transition(frame)
case @state
when :closing
@state = :closed
- emit(:close, frame[:error])
+ close_stream_in_conn(frame[:error])
when :half_closing
@state = @closed
emit(:half_close)
end
end
def process_priority(frame)
@weight = frame[:weight]
- @dependency = frame[:stream_dependency]
+ @dependency = frame[:dependency]
emit(
:priority,
weight: frame[:weight],
- dependency: frame[:stream_dependency],
+ dependency: frame[:dependency],
exclusive: frame[:exclusive]
)
# TODO: implement dependency tree housekeeping
# Latest draft defines a fairly complex priority control.
# See https://tools.ietf.org/html/draft-ietf-httpbis-http2-16#section-5.3
@@ -597,22 +667,21 @@
end
def end_stream?(frame)
case frame[:type]
when :data, :headers, :continuation
- return false unless frame[:flags]
-
- frame[:flags].include?(:end_stream)
+ frame[:flags] && frame[:flags].include?(:end_stream)
else false
end
end
def stream_error(error = :internal_error, msg: nil)
+ # if the stream already broke with an error, ignore subsequent
+
@error = error
close(error) if @state != :closed
- klass = error.to_s.split('_').map(&:capitalize).join
- raise Error.const_get(klass), msg
+ raise Error.types[error], msg
end
alias error stream_error
def manage_state(frame)
transition(frame, true)