lib/http/2/stream.rb in http-2-0.12.0 vs lib/http/2/stream.rb in http-2-1.0.0

- old
+ new

@@ -72,23 +72,28 @@ # @param exclusive [Boolean] # @param window [Integer] # @param parent [Stream] # @param state [Symbol] def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle) + stream_error(:protocol_error, msg: "stream can't depend on itself") if id == dependency + @connection = connection @id = id @weight = weight @dependency = dependency - process_priority(weight: weight, stream_dependency: dependency, exclusive: exclusive) + process_priority(weight: weight, dependency: dependency, exclusive: exclusive) @local_window_max_size = connection.local_settings[:settings_initial_window_size] - @local_window = connection.local_settings[:settings_initial_window_size] + @local_window = connection.local_settings[:settings_initial_window_size] @remote_window = connection.remote_settings[:settings_initial_window_size] @parent = parent @state = state @error = false @closed = false - @send_buffer = [] + @_method = @_content_length = @_status_code = nil + @_waiting_on_trailers = false + @received_data = false + @activated = false on(:window) { |v| @remote_window = v } on(:local_window) { |v| @local_window_max_size = @local_window = v } end @@ -102,22 +107,50 @@ def receive(frame) transition(frame, false) case frame[:type] when :data + # 6.1. DATA + # If a DATA frame is received whose stream is not in "open" or + # "half closed (local)" state, the recipient MUST respond with a + # stream error (Section 5.4.2) of type STREAM_CLOSED. + stream_error(:stream_closed) unless @state == :open || + @state == :half_closed_local || + @state == :half_closing || @state == :closing || + (@state == :closed && @closed == :local_rst) + @received_data = true + calculate_content_length(frame[:length]) update_local_window(frame) # Emit DATA frame emit(:data, frame[:payload]) unless frame[:ignore] calculate_window_update(@local_window_max_size) when :headers + stream_error(:stream_closed) if (@state == :closed && @closed != :local_rst) || + @state == :remote_closed + @_method ||= frame[:method] + @_status_code ||= frame[:status] + @_content_length ||= frame[:content_length] + @_trailers ||= frame[:trailer] + if @_waiting_on_trailers || + (@received_data && + (!@_status_code || @_status_code >= 200)) + + # An endpoint that receives a HEADERS frame without the END_STREAM flag set after receiving a final + # (non-informational) status code MUST treat the corresponding request or response as malformed. + verify_trailers(frame) + end emit(:headers, frame[:payload]) unless frame[:ignore] + @_waiting_on_trailers = !@_trailers.nil? when :push_promise emit(:promise_headers, frame[:payload]) unless frame[:ignore] + when :continuation + stream_error(:stream_closed) if (@state == :closed && @closed != :local_rst) || @state == :remote_closed + stream_error(:protocol_error) if @received_data when :priority process_priority(frame) when :window_update - process_window_update(frame) + process_window_update(frame: frame) when :altsvc # 4. The ALTSVC HTTP/2 Frame # An ALTSVC frame on a # stream other than stream 0 containing non-empty "Origin" information # is invalid and MUST be ignored. @@ -128,10 +161,33 @@ complete_transition(frame) end alias << receive + def verify_trailers(frame) + stream_error(:protocol_error, msg: "trailer headers frame must close the stream") unless end_stream?(frame) + return unless @_trailers + + trailers = frame[:payload] + return unless trailers.respond_to?(:each) + + trailers.each do |field, _| # rubocop:disable Style/HashEachMethods + @_trailers.delete(field) + break if @_trailers.empty? + end + stream_error(:protocol_error, msg: "didn't receive all expected trailer headers") unless @_trailers.empty? + end + + def calculate_content_length(data_length) + return unless @_content_length && data_length + + @_content_length -= data_length + return if @_content_length >= 0 + + stream_error(:protocol_error, msg: "received more data than what was defined in content-length") + end + # Processes outgoing HTTP 2.0 frames. Data frames may be automatically # split and buffered based on maximum frame size and current stream flow # control window size. # # @param frame [Hash] @@ -161,17 +217,17 @@ # @param end_headers [Boolean] indicates that no more headers will be sent # @param end_stream [Boolean] indicates that no payload will be sent def headers(headers, end_headers: true, end_stream: false) flags = [] flags << :end_headers if end_headers - flags << :end_stream if end_stream + flags << :end_stream if end_stream || @_method == "HEAD" send(type: :headers, flags: flags, payload: headers) end def promise(headers, end_headers: true, &block) - raise ArgumentError, 'must provide callback' unless block_given? + raise ArgumentError, "must provide callback" unless block flags = end_headers ? [:end_headers] : [] emit(:promise, self, headers, flags, &block) end @@ -180,11 +236,11 @@ # # @param weight [Integer] new stream weight value # @param dependency [Integer] new stream dependency stream def reprioritize(weight: 16, dependency: 0, exclusive: false) stream_error if @id.even? - send(type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive) + send(type: :priority, weight: weight, dependency: dependency, exclusive: exclusive) end # Sends DATA frame containing response payload. # # @param payload [String] @@ -548,47 +604,61 @@ def event(newstate) case newstate when :open @state = newstate - emit(:active) + activate_stream_in_conn when :reserved_local, :reserved_remote @state = newstate emit(:reserved) when :half_closed_local, :half_closed_remote @closed = newstate - emit(:active) unless @state == :open + activate_stream_in_conn unless @state == :open @state = :half_closing when :local_closed, :remote_closed, :local_rst, :remote_rst @closed = newstate @state = :closing end @state end + # Streams that are in the "open" state, or either of the "half closed" + # states count toward the maximum number of streams that an endpoint is + # permitted to open. + def activate_stream_in_conn + @connection.active_stream_count += 1 + @activated = true + emit(:active) + end + + def close_stream_in_conn(*args) + @connection.active_stream_count -= 1 if @activated + emit(:close, *args) + end + def complete_transition(frame) case @state when :closing @state = :closed - emit(:close, frame[:error]) + close_stream_in_conn(frame[:error]) when :half_closing @state = @closed emit(:half_close) end end def process_priority(frame) @weight = frame[:weight] - @dependency = frame[:stream_dependency] + @dependency = frame[:dependency] emit( :priority, weight: frame[:weight], - dependency: frame[:stream_dependency], + dependency: frame[:dependency], exclusive: frame[:exclusive] ) # TODO: implement dependency tree housekeeping # Latest draft defines a fairly complex priority control. # See https://tools.ietf.org/html/draft-ietf-httpbis-http2-16#section-5.3 @@ -597,22 +667,21 @@ end def end_stream?(frame) case frame[:type] when :data, :headers, :continuation - return false unless frame[:flags] - - frame[:flags].include?(:end_stream) + frame[:flags] && frame[:flags].include?(:end_stream) else false end end def stream_error(error = :internal_error, msg: nil) + # if the stream already broke with an error, ignore subsequent + @error = error close(error) if @state != :closed - klass = error.to_s.split('_').map(&:capitalize).join - raise Error.const_get(klass), msg + raise Error.types[error], msg end alias error stream_error def manage_state(frame) transition(frame, true)