lib/http/2/connection.rb in http-2-0.7.0 vs lib/http/2/connection.rb in http-2-0.8.0

- old
+ new

@@ -1,9 +1,8 @@ module HTTP2 - # Default connection and stream flow control window (64KB). - DEFAULT_FLOW_WINDOW = 65535 + DEFAULT_FLOW_WINDOW = 65_535 # Default header table size DEFAULT_HEADER_SIZE = 4096 # Default stream_limit @@ -12,36 +11,38 @@ # Default values for SETTINGS frame, as defined by the spec. SPEC_DEFAULT_CONNECTION_SETTINGS = { settings_header_table_size: 4096, settings_enable_push: 1, # enabled for servers settings_max_concurrent_streams: Framer::MAX_STREAM_ID, # unlimited - settings_initial_window_size: 65535, - settings_max_frame_size: 16384, + settings_initial_window_size: 65_535, + settings_max_frame_size: 16_384, settings_max_header_list_size: 2**31 - 1, # unlimited }.freeze DEFAULT_CONNECTION_SETTINGS = { settings_header_table_size: 4096, settings_enable_push: 1, # enabled for servers settings_max_concurrent_streams: 100, - settings_initial_window_size: 65535, # - settings_max_frame_size: 16384, + settings_initial_window_size: 65_535, # + settings_max_frame_size: 16_384, settings_max_header_list_size: 2**31 - 1, # unlimited }.freeze # Default stream priority (lower values are higher priority). DEFAULT_WEIGHT = 16 # Default connection "fast-fail" preamble string as defined by the spec. - CONNECTION_PREFACE_MAGIC = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + CONNECTION_PREFACE_MAGIC = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".freeze # Connection encapsulates all of the connection, stream, flow-control, # error management, and other processing logic required for a well-behaved # HTTP 2.0 endpoint. # # Note that this class should not be used directly. Instead, you want to # use either Client or Server class to drive the HTTP 2.0 exchange. + # + # rubocop:disable ClassLength class Connection include FlowBuffer include Emitter include Error @@ -53,11 +54,11 @@ # Size of current connection flow control window (by default, set to # infinity, but is automatically updated on receipt of peer settings). attr_reader :local_window attr_reader :remote_window - alias :window :local_window + alias_method :window, :local_window # Current settings value for local and peer attr_reader :local_settings attr_reader :remote_settings @@ -99,12 +100,12 @@ # # @param priority [Integer] # @param window [Integer] # @param parent [Stream] def new_stream(**args) - raise ConnectionClosed.new if @state == :closed - raise StreamLimitExceeded.new if @active_stream_count >= @remote_settings[:settings_max_concurrent_streams] + fail ConnectionClosed if @state == :closed + fail StreamLimitExceeded if @active_stream_count >= @remote_settings[:settings_max_concurrent_streams] stream = activate_stream(id: @stream_id, **args) @stream_id += 2 stream @@ -113,11 +114,11 @@ # Sends PING frame to the peer. # # @param payload [String] optional payload must be 8 bytes long # @param blk [Proc] callback to execute when PONG is received def ping(payload, &blk) - send({type: :ping, stream: 0, payload: payload}) + send(type: :ping, stream: 0, payload: payload) once(:ack, &blk) if blk end # Sends a GOAWAY frame indicating that the peer should stop creating # new streams for current connection. @@ -128,27 +129,30 @@ # since it could contain sensitive information. # # @param error [Symbol] # @param payload [String] def goaway(error = :no_error, payload = nil) - send({ - type: :goaway, last_stream: (@streams.max.first rescue 0), - error: error, payload: payload - }) + last_stream = if (max = @streams.max) + max.first + else + 0 + end + + send(type: :goaway, last_stream: last_stream, + error: error, payload: payload) @state = :closed end # Sends a connection SETTINGS frame to the peer. # The values are reflected when the corresponding ACK is received. # # @param settings [Array or Hash] def settings(payload) payload = payload.to_a - check = validate_settings(@local_role, payload) - check and connection_error + connection_error if validate_settings(@local_role, payload) @pending_settings << payload - send({type: :settings, stream: 0, payload: payload}) + send(type: :settings, stream: 0, payload: payload) @pending_settings << payload end # Decodes incoming bytes into HTTP 2.0 frames and routes them to # appropriate receivers: connection frames are handled directly, and @@ -166,40 +170,38 @@ # Client connection header is 24 byte connection header followed by # SETTINGS frame. Server connection header is SETTINGS frame only. if @state == :waiting_magic if @recv_buffer.size < 24 if !CONNECTION_PREFACE_MAGIC.start_with? @recv_buffer - raise HandshakeError.new + fail HandshakeError else return # maybe next time end - - elsif @recv_buffer.read(24) != CONNECTION_PREFACE_MAGIC - raise HandshakeError.new - else + elsif @recv_buffer.read(24) == CONNECTION_PREFACE_MAGIC # MAGIC is OK. Send our settings @state = :waiting_connection_preface - payload = @local_settings.select {|k,v| v != SPEC_DEFAULT_CONNECTION_SETTINGS[k]} + payload = @local_settings.reject { |k, v| v == SPEC_DEFAULT_CONNECTION_SETTINGS[k] } settings(payload) + else + fail HandshakeError end end - while frame = @framer.parse(@recv_buffer) do + while (frame = @framer.parse(@recv_buffer)) emit(:frame_received, frame) # Header blocks MUST be transmitted as a contiguous sequence of frames # with no interleaved frames of any other type, or from any other stream. - if !@continuation.empty? - if frame[:type] != :continuation || - frame[:stream] != @continuation.first[:stream] + unless @continuation.empty? + unless frame[:type] == :continuation && frame[:stream] == @continuation.first[:stream] connection_error end @continuation << frame - return if !frame[:flags].include? :end_headers + return unless frame[:flags].include? :end_headers - payload = @continuation.map {|f| f[:payload]}.join + payload = @continuation.map { |f| f[:payload] }.join frame = @continuation.shift @continuation.clear frame.delete(:length) @@ -217,11 +219,11 @@ else case frame[:type] when :headers # The last frame in a sequence of HEADERS/CONTINUATION # frames MUST have the END_HEADERS flag set. - if !frame[:flags].include? :end_headers + unless frame[:flags].include? :end_headers @continuation << frame return end # After sending a GOAWAY frame, the sender can discard frames @@ -232,23 +234,25 @@ decode_headers(frame) return if @state == :closed stream = @streams[frame[:stream]] if stream.nil? - stream = activate_stream(id: frame[:stream], - weight: frame[:weight] || DEFAULT_WEIGHT, - dependency: frame[:dependency] || 0, - exclusive: frame[:exclusive] || false) + stream = activate_stream( + id: frame[:stream], + weight: frame[:weight] || DEFAULT_WEIGHT, + dependency: frame[:dependency] || 0, + exclusive: frame[:exclusive] || false, + ) emit(:stream, stream) end stream << frame when :push_promise # The last frame in a sequence of PUSH_PROMISE/CONTINUATION # frames MUST have the END_HEADERS flag set - if !frame[:flags].include? :end_headers + unless frame[:flags].include? :end_headers @continuation << frame return end decode_headers(frame) @@ -267,44 +271,61 @@ parent = @streams[frame[:stream]] pid = frame[:promise_stream] connection_error(msg: 'missing parent ID') if parent.nil? - if !(parent.state == :open || parent.state == :half_closed_local) + unless parent.state == :open || parent.state == :half_closed_local # An endpoint might receive a PUSH_PROMISE frame after it sends # RST_STREAM. PUSH_PROMISE causes a stream to become "reserved". # The RST_STREAM does not cancel any promised stream. Therefore, if # promised streams are not desired, a RST_STREAM can be used to # close any of those streams. if parent.closed == :local_rst # We can either (a) 'resurrect' the parent, or (b) RST_STREAM # ... sticking with (b), might need to revisit later. - send({type: :rst_stream, stream: pid, error: :refused_stream}) + send(type: :rst_stream, stream: pid, error: :refused_stream) else connection_error end end stream = activate_stream(id: pid, parent: parent) emit(:promise, stream) stream << frame else - if stream = @streams[frame[:stream]] + if (stream = @streams[frame[:stream]]) stream << frame else - # An endpoint that receives an unexpected stream identifier - # MUST respond with a connection error of type PROTOCOL_ERROR. - connection_error + # The PRIORITY frame can be sent for a stream in the "idle" or + # "closed" state. This allows for the reprioritization of a + # group of dependent streams by altering the priority of an + # unused or closed parent stream. + if frame[:type] == :priority + stream = activate_stream( + id: frame[:stream], + weight: frame[:weight] || DEFAULT_WEIGHT, + dependency: frame[:dependency] || 0, + exclusive: frame[:exclusive] || false, + ) + + emit(:stream, stream) + stream << frame + else + # An endpoint that receives an unexpected stream identifier + # MUST respond with a connection error of type PROTOCOL_ERROR. + connection_error + end end end end end rescue => e + raise if e.is_a?(Error::Error) connection_error end - alias :<< :receive + alias_method :<<, :receive private # Send an outgoing frame. DATA frames are subject to connection flow # control and may be split and / or buffered based on current window size. @@ -318,18 +339,17 @@ send_data(frame, true) else # An endpoint can end a connection at any time. In particular, an # endpoint MAY choose to treat a stream error as a connection error. - if frame[:type] == :rst_stream - if frame[:error] == :protocol_error - goaway(frame[:error]) - end + if frame[:type] == :rst_stream && frame[:error] == :protocol_error + goaway(frame[:error]) else - # HEADERS and PUSH_PROMISE may generate CONTINUATION + # HEADERS and PUSH_PROMISE may generate CONTINUATION. Also send + # RST_STREAM that are not protocol errors frames = encode(frame) - frames.each {|f| emit(:frame, f) } + frames.each { |f| emit(:frame, f) } end end end # Applies HTTP 2.0 binary encoding to the frame. @@ -337,30 +357,29 @@ # @param frame [Hash] # @return [Array of Buffer] encoded frame def encode(frame) frames = [] - if frame[:type] == :headers || - frame[:type] == :push_promise + if frame[:type] == :headers || frame[:type] == :push_promise frames = encode_headers(frame) # HEADERS and PUSH_PROMISE may create more than one frame else frames = [frame] # otherwise one frame end - frames.map {|f| @framer.generate(f) } + frames.map { |f| @framer.generate(f) } end # Check if frame is a connection frame: SETTINGS, PING, GOAWAY, and any # frame addressed to stream ID = 0. # # @param frame [Hash] # @return [Boolean] def connection_frame?(frame) frame[:stream] == 0 || - frame[:type] == :settings || - frame[:type] == :ping || - frame[:type] == :goaway + frame[:type] == :settings || + frame[:type] == :ping || + frame[:type] == :goaway end # Process received connection frame (stream ID = 0). # - Handle SETTINGS updates # - Connection flow control (WINDOW_UPDATE) @@ -384,14 +403,12 @@ send_data(nil, true) when :ping if frame[:flags].include? :ack emit(:ack, frame[:payload]) else - send({ - type: :ping, stream: 0, - flags: [:ack], payload: frame[:payload] - }) + send(type: :ping, stream: 0, + flags: [:ack], payload: frame[:payload]) end when :goaway # Receivers of a GOAWAY frame MUST NOT open additional streams on # the connection, although a new connection can be established # for new streams. @@ -410,24 +427,22 @@ # Validate settings parameters. See sepc Section 6.5.2. # # @param role [Symbol] The sender's role: :client or :server # @return nil if no error. Exception object in case of any error. def validate_settings(role, settings) - settings.each do |key,v| + settings.each do |key, v| case key when :settings_header_table_size # Any value is valid when :settings_enable_push case role when :server # Section 8.2 # Clients MUST reject any attempt to change the # SETTINGS_ENABLE_PUSH setting to a value other than 0 by treating the # message as a connection error (Section 5.4.1) of type PROTOCOL_ERROR. - unless v == 0 - return ProtocolError.new("invalid #{key} value") - end + return ProtocolError.new("invalid #{key} value") unless v == 0 when :client # Any value other than 0 or 1 MUST be treated as a # connection error (Section 5.4.1) of type PROTOCOL_ERROR. unless v == 0 || v == 1 return ProtocolError.new("invalid #{key} value") @@ -446,45 +461,40 @@ # The initial value is 2^14 (16,384) octets. The value advertised # by an endpoint MUST be between this initial value and the maximum # allowed frame size (2^24-1 or 16,777,215 octets), inclusive. # Values outside this range MUST be treated as a connection error # (Section 5.4.1) of type PROTOCOL_ERROR. - unless 16384 <= v && v <= 16777215 + unless 16_384 <= v && v <= 16_777_215 return ProtocolError.new("invalid #{key} value") end when :settings_max_header_list_size # Any value is valid - else - # ignore unknown settings + # else # ignore unknown settings end end nil end # Update connection settings based on parameters set by the peer. # # @param frame [Hash] def connection_settings(frame) - if (frame[:type] != :settings || frame[:stream] != 0) - connection_error - end + connection_error unless frame[:type] == :settings && frame[:stream] == 0 # Apply settings. # side = # local: previously sent and pended our settings should be effective # remote: just received peer settings should immediately be effective - settings, side = \ - if frame[:flags].include?(:ack) - # Process pending settings we have sent. - [@pending_settings.shift, :local] - else - check = validate_settings(@remote_role, frame[:payload]) - check and connection_error(check) - [frame[:payload], :remote] - end + settings, side = if frame[:flags].include?(:ack) + # Process pending settings we have sent. + [@pending_settings.shift, :local] + else + connection_error(check) if validate_settings(@remote_role, frame[:payload]) + [frame[:payload], :remote] + end - settings.each do |key,v| + settings.each do |key, v| case side when :local @local_settings[key] = v when :remote @remote_settings[key] = v @@ -502,18 +512,18 @@ # controlled frames until it receives WINDOW_UPDATE frames that cause # the flow control window to become positive. case side when :local @local_window = @local_window - @local_window_limit + v - @streams.each do |id, stream| + @streams.each do |_id, stream| stream.emit(:local_window, stream.local_window - @local_window_limit + v) end @local_window_limit = v when :remote @remote_window = @remote_window - @remote_window_limit + v - @streams.each do |id, stream| + @streams.each do |_id, stream| # Event name is :window, not :remote_window stream.emit(:window, stream.remote_window - @remote_window_limit + v) end @remote_window_limit = v @@ -521,34 +531,33 @@ when :settings_header_table_size # Setting header table size might cause some headers evicted case side when :local - @decompressor.set_table_size(v) + @decompressor.table_size = v when :remote - @compressor.set_table_size(v) + @compressor.table_size = v end when :settings_enable_push # nothing to do when :settings_max_frame_size # nothing to do - else - # ignore unknown settings + # else # ignore unknown settings end end case side when :local # Received a settings_ack. Notify application layer. emit(:settings_ack, frame, @pending_settings.size) when :remote - if @state != :closed + unless @state == :closed || @h2c_upgrade == :start # Send ack to peer - send({type: :settings, stream: 0, payload: [], flags: [:ack]}) + send(type: :settings, stream: 0, payload: [], flags: [:ack]) end end end # Decode headers payload and update connection decompressor state. @@ -562,23 +571,21 @@ def decode_headers(frame) if frame[:payload].is_a? String frame[:payload] = @decompressor.decode(frame[:payload]) end - rescue Exception => e + rescue => e connection_error(:compression_error, msg: e.message) end # Encode headers payload and update connection compressor state. # # @param frame [Hash] # @return [Array of Frame] def encode_headers(frame) payload = frame[:payload] - unless payload.is_a? String - payload = @compressor.encode(payload) - end + payload = @compressor.encode(payload) unless payload.is_a? String frames = [] while payload.size > 0 cont = frame.dup @@ -590,16 +597,16 @@ if frames.empty? frames = [frame] else frames.first[:type] = frame[:type] frames.first[:flags] = frame[:flags] - [:end_headers] - frames.last[:flags] << :end_headers + frames.last[:flags] << :end_headers end frames - rescue Exception => e + rescue => e [connection_error(:compression_error, msg: e.message)] end # Activates new incoming or outgoing stream and registers appropriate # connection managemet callbacks. @@ -607,15 +614,13 @@ # @param id [Integer] # @param priority [Integer] # @param window [Integer] # @param parent [Stream] def activate_stream(id: nil, **args) - if @streams.key?(id) - connection_error(msg: 'Stream ID already exists') - end + connection_error(msg: 'Stream ID already exists') if @streams.key?(id) - stream = Stream.new({connection: self, id: id}.merge(args)) + stream = Stream.new({ connection: self, id: id }.merge(args)) # Streams that are in the "open" state, or either of the "half closed" # states count toward the maximum number of streams that an endpoint is # permitted to open. stream.once(:active) { @active_stream_count += 1 } @@ -636,14 +641,13 @@ # @option error [Symbol] :stream_closed # @option error [Symbol] :frame_too_large # @option error [Symbol] :compression_error # @param msg [String] def connection_error(error = :protocol_error, msg: nil) - goaway(error) if @state != :closed && @state != :new + goaway(error) unless @state == :closed || @state == :new @state, @error = :closed, error klass = error.to_s.split('_').map(&:capitalize).join - raise Error.const_get(klass).new(msg) + fail Error.const_get(klass), msg end - end end