lib/http/2/connection.rb in http-2-0.6.3 vs lib/http/2/connection.rb in http-2-0.7.0

- old
+ new

@@ -1,15 +1,40 @@ module HTTP2 # Default connection and stream flow control window (64KB). DEFAULT_FLOW_WINDOW = 65535 + # Default header table size + DEFAULT_HEADER_SIZE = 4096 + + # Default stream_limit + DEFAULT_MAX_CONCURRENT_STREAMS = 100 + + # Default values for SETTINGS frame, as defined by the spec. + SPEC_DEFAULT_CONNECTION_SETTINGS = { + settings_header_table_size: 4096, + settings_enable_push: 1, # enabled for servers + settings_max_concurrent_streams: Framer::MAX_STREAM_ID, # unlimited + settings_initial_window_size: 65535, + settings_max_frame_size: 16384, + settings_max_header_list_size: 2**31 - 1, # unlimited + }.freeze + + DEFAULT_CONNECTION_SETTINGS = { + settings_header_table_size: 4096, + settings_enable_push: 1, # enabled for servers + settings_max_concurrent_streams: 100, + settings_initial_window_size: 65535, # + settings_max_frame_size: 16384, + settings_max_header_list_size: 2**31 - 1, # unlimited + }.freeze + # Default stream priority (lower values are higher priority). - DEFAULT_PRIORITY = 2**30 + DEFAULT_WEIGHT = 16 # Default connection "fast-fail" preamble string as defined by the spec. - CONNECTION_HEADER = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + CONNECTION_PREFACE_MAGIC = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" # Connection encapsulates all of the connection, stream, flow-control, # error management, and other processing logic required for a well-behaved # HTTP 2.0 endpoint. # @@ -26,31 +51,46 @@ # Last connection error if connection is aborted. attr_reader :error # Size of current connection flow control window (by default, set to # infinity, but is automatically updated on receipt of peer settings). - attr_reader :window + attr_reader :local_window + attr_reader :remote_window + alias :window :local_window - # Maximum number of concurrent streams allowed by the peer (automatically - # updated on receipt of peer settings). - attr_reader :stream_limit + # Current settings value for local and peer + attr_reader :local_settings + attr_reader :remote_settings + # Pending settings value + # Sent but not ack'ed settings + attr_reader :pending_settings + # Number of active streams between client and server (reserved streams # are not counted towards the stream limit). attr_reader :active_stream_count # Initializes new connection object. # - def initialize(streams: 100, window: DEFAULT_FLOW_WINDOW) - @stream_limit = streams + def initialize(**settings) + @local_settings = DEFAULT_CONNECTION_SETTINGS.merge(settings) + @remote_settings = SPEC_DEFAULT_CONNECTION_SETTINGS.dup + + @compressor = Header::Compressor.new(settings) + @decompressor = Header::Decompressor.new(settings) + @active_stream_count = 0 @streams = {} + @pending_settings = [] @framer = Framer.new - @window = window - @window_limit = window + @local_window_limit = @local_settings[:settings_initial_window_size] + @local_window = @local_window_limit + @remote_window_limit = @remote_settings[:settings_initial_window_size] + @remote_window = @remote_window_limit + @recv_buffer = Buffer.new @send_buffer = [] @continuation = [] @error = nil end @@ -58,15 +98,15 @@ # Allocates new stream for current connection. # # @param priority [Integer] # @param window [Integer] # @param parent [Stream] - def new_stream(priority: DEFAULT_PRIORITY, parent: nil) + def new_stream(**args) raise ConnectionClosed.new if @state == :closed - raise StreamLimitExceeded.new if @active_stream_count == @stream_limit + raise StreamLimitExceeded.new if @active_stream_count >= @remote_settings[:settings_max_concurrent_streams] - stream = activate_stream(@stream_id, priority, parent) + stream = activate_stream(id: @stream_id, **args) @stream_id += 2 stream end @@ -74,11 +114,11 @@ # # @param payload [String] optional payload must be 8 bytes long # @param blk [Proc] callback to execute when PONG is received def ping(payload, &blk) send({type: :ping, stream: 0, payload: payload}) - once(:pong, &blk) if blk + once(:ack, &blk) if blk end # Sends a GOAWAY frame indicating that the peer should stop creating # new streams for current connection. # @@ -95,24 +135,21 @@ error: error, payload: payload }) @state = :closed end - # Sends a connection SETTINGS frame to the peer. Setting window size - # to Float::INFINITY disables flow control. + # Sends a connection SETTINGS frame to the peer. + # The values are reflected when the corresponding ACK is received. # - # @param stream_limit [Integer] maximum number of concurrent streams - # @param window_limit [Float] maximum flow window size - def settings(stream_limit: @stream_limit, window_limit: @window_limit) - payload = { settings_max_concurrent_streams: stream_limit } - if window_limit.to_f.infinite? - payload[:settings_flow_control_options] = 1 - else - payload[:settings_initial_window_size] = window_limit - end - + # @param settings [Array or Hash] + def settings(payload) + payload = payload.to_a + check = validate_settings(@local_role, payload) + check and connection_error + @pending_settings << payload send({type: :settings, stream: 0, payload: payload}) + @pending_settings << payload end # Decodes incoming bytes into HTTP 2.0 frames and routes them to # appropriate receivers: connection frames are handled directly, and # stream frames are passed to appropriate stream objects. @@ -126,27 +163,31 @@ # connection header as a final confirmation and to establish the # initial settings for the HTTP/2.0 connection. # # Client connection header is 24 byte connection header followed by # SETTINGS frame. Server connection header is SETTINGS frame only. - if @state == :new + if @state == :waiting_magic if @recv_buffer.size < 24 - if !CONNECTION_HEADER.start_with? @recv_buffer + if !CONNECTION_PREFACE_MAGIC.start_with? @recv_buffer raise HandshakeError.new else - return + return # maybe next time end - elsif @recv_buffer.read(24) != CONNECTION_HEADER + elsif @recv_buffer.read(24) != CONNECTION_PREFACE_MAGIC raise HandshakeError.new else - @state = :connection_header - settings(stream_limit: @stream_limit, window_limit: @window_limit) + # MAGIC is OK. Send our settings + @state = :waiting_connection_preface + payload = @local_settings.select {|k,v| v != SPEC_DEFAULT_CONNECTION_SETTINGS[k]} + settings(payload) end end while frame = @framer.parse(@recv_buffer) do + emit(:frame_received, frame) + # Header blocks MUST be transmitted as a contiguous sequence of frames # with no interleaved frames of any other type, or from any other stream. if !@continuation.empty? if frame[:type] != :continuation || frame[:stream] != @continuation.first[:stream] @@ -154,25 +195,18 @@ end @continuation << frame return if !frame[:flags].include? :end_headers - headers = @continuation.collect do |chunk| - decode_headers(chunk) - chunk[:payload] - end.flatten(1) + payload = @continuation.map {|f| f[:payload]}.join frame = @continuation.shift @continuation.clear frame.delete(:length) - frame[:payload] = headers - frame[:flags] << if frame[:type] == :push_promise - :end_push_promise - else - :end_headers - end + frame[:payload] = Buffer.new(payload) + frame[:flags] << :end_headers end # SETTINGS frames always apply to a connection, never a single stream. # The stream identifier for a settings frame MUST be zero. If an # endpoint receives a SETTINGS frame whose stream identifier field is @@ -198,21 +232,23 @@ decode_headers(frame) return if @state == :closed stream = @streams[frame[:stream]] if stream.nil? - stream = activate_stream(frame[:stream], - frame[:priority] || DEFAULT_PRIORITY) + stream = activate_stream(id: frame[:stream], + weight: frame[:weight] || DEFAULT_WEIGHT, + dependency: frame[:dependency] || 0, + exclusive: frame[:exclusive] || false) emit(:stream, stream) end stream << frame when :push_promise # The last frame in a sequence of PUSH_PROMISE/CONTINUATION - # frames MUST have the END_PUSH_PROMISE/END_HEADERS flag set - if !frame[:flags].include? :end_push_promise + # frames MUST have the END_HEADERS flag set + if !frame[:flags].include? :end_headers @continuation << frame return end decode_headers(frame) @@ -246,11 +282,11 @@ else connection_error end end - stream = activate_stream(pid, DEFAULT_PRIORITY, parent) + stream = activate_stream(id: pid, parent: parent) emit(:promise, stream) stream << frame else if stream = @streams[frame[:stream]] stream << frame @@ -261,11 +297,11 @@ end end end end - rescue + rescue => e connection_error end alias :<< :receive private @@ -275,10 +311,11 @@ # All other frames are sent immediately. # # @note all frames are currently delivered in FIFO order. # @param frame [Hash] def send(frame) + emit(:frame_sent, frame) if frame[:type] == :data send_data(frame, true) else # An endpoint can end a connection at any time. In particular, an @@ -286,26 +323,32 @@ if frame[:type] == :rst_stream if frame[:error] == :protocol_error goaway(frame[:error]) end else - emit(:frame, encode(frame)) + # HEADERS and PUSH_PROMISE may generate CONTINUATION + frames = encode(frame) + frames.each {|f| emit(:frame, f) } end end end # Applies HTTP 2.0 binary encoding to the frame. # # @param frame [Hash] - # @return [Buffer] encoded frame + # @return [Array of Buffer] encoded frame def encode(frame) + frames = [] + if frame[:type] == :headers || frame[:type] == :push_promise - encode_headers(frame) + frames = encode_headers(frame) # HEADERS and PUSH_PROMISE may create more than one frame + else + frames = [frame] # otherwise one frame end - @framer.generate(frame) + frames.map {|f| @framer.generate(f) } end # Check if frame is a connection frame: SETTINGS, PING, GOAWAY, and any # frame addressed to stream ID = 0. # @@ -325,86 +368,189 @@ # - Mark connection as closed on GOAWAY # # @param frame [Hash] def connection_management(frame) case @state - when :connection_header - # SETTINGS frames MUST be sent at the start of a connection. - connection_settings(frame) + when :waiting_connection_preface + # The first frame MUST be a SETTINGS frame at the start of a connection. @state = :connected + connection_settings(frame) when :connected case frame[:type] when :settings connection_settings(frame) when :window_update - flow_control_allowed? - @window += frame[:increment] + @remote_window += frame[:increment] send_data(nil, true) when :ping - if frame[:flags].include? :pong - emit(:pong, frame[:payload]) + if frame[:flags].include? :ack + emit(:ack, frame[:payload]) else send({ type: :ping, stream: 0, - flags: [:pong], payload: frame[:payload] + flags: [:ack], payload: frame[:payload] }) end when :goaway # Receivers of a GOAWAY frame MUST NOT open additional streams on # the connection, although a new connection can be established # for new streams. @state = :closed emit(:goaway, frame[:last_stream], frame[:error], frame[:payload]) - + when :altsvc, :blocked + emit(frame[:type], frame) else connection_error end else connection_error end end - # Update local connection settings based on parameters set by the peer. + # Validate settings parameters. See sepc Section 6.5.2. # + # @param role [Symbol] The sender's role: :client or :server + # @return nil if no error. Exception object in case of any error. + def validate_settings(role, settings) + settings.each do |key,v| + case key + when :settings_header_table_size + # Any value is valid + when :settings_enable_push + case role + when :server + # Section 8.2 + # Clients MUST reject any attempt to change the + # SETTINGS_ENABLE_PUSH setting to a value other than 0 by treating the + # message as a connection error (Section 5.4.1) of type PROTOCOL_ERROR. + unless v == 0 + return ProtocolError.new("invalid #{key} value") + end + when :client + # Any value other than 0 or 1 MUST be treated as a + # connection error (Section 5.4.1) of type PROTOCOL_ERROR. + unless v == 0 || v == 1 + return ProtocolError.new("invalid #{key} value") + end + end + when :settings_max_concurrent_streams + # Any value is valid + when :settings_initial_window_size + # Values above the maximum flow control window size of 2^31-1 MUST + # be treated as a connection error (Section 5.4.1) of type + # FLOW_CONTROL_ERROR. + unless v <= 0x7fffffff + return FlowControlError.new("invalid #{key} value") + end + when :settings_max_frame_size + # The initial value is 2^14 (16,384) octets. The value advertised + # by an endpoint MUST be between this initial value and the maximum + # allowed frame size (2^24-1 or 16,777,215 octets), inclusive. + # Values outside this range MUST be treated as a connection error + # (Section 5.4.1) of type PROTOCOL_ERROR. + unless 16384 <= v && v <= 16777215 + return ProtocolError.new("invalid #{key} value") + end + when :settings_max_header_list_size + # Any value is valid + else + # ignore unknown settings + end + end + nil + end + + # Update connection settings based on parameters set by the peer. + # # @param frame [Hash] def connection_settings(frame) if (frame[:type] != :settings || frame[:stream] != 0) connection_error end - frame[:payload].each do |key,v| + # Apply settings. + # side = + # local: previously sent and pended our settings should be effective + # remote: just received peer settings should immediately be effective + settings, side = \ + if frame[:flags].include?(:ack) + # Process pending settings we have sent. + [@pending_settings.shift, :local] + else + check = validate_settings(@remote_role, frame[:payload]) + check and connection_error(check) + [frame[:payload], :remote] + end + + settings.each do |key,v| + case side + when :local + @local_settings[key] = v + when :remote + @remote_settings[key] = v + end + case key when :settings_max_concurrent_streams - @stream_limit = v + # Do nothing. + # The value controls at the next attempt of stream creation. - # A change to SETTINGS_INITIAL_WINDOW_SIZE could cause the available - # space in a flow control window to become negative. A sender MUST - # track the negative flow control window, and MUST NOT send new flow - # controlled frames until it receives WINDOW_UPDATE frames that cause - # the flow control window to become positive. when :settings_initial_window_size - flow_control_allowed? - @window = @window - @window_limit + v - @streams.each do |id, stream| - stream.emit(:window, stream.window - @window_limit + v) - end + # A change to SETTINGS_INITIAL_WINDOW_SIZE could cause the available + # space in a flow control window to become negative. A sender MUST + # track the negative flow control window, and MUST NOT send new flow + # controlled frames until it receives WINDOW_UPDATE frames that cause + # the flow control window to become positive. + case side + when :local + @local_window = @local_window - @local_window_limit + v + @streams.each do |id, stream| + stream.emit(:local_window, stream.local_window - @local_window_limit + v) + end - @window_limit = v + @local_window_limit = v + when :remote + @remote_window = @remote_window - @remote_window_limit + v + @streams.each do |id, stream| + # Event name is :window, not :remote_window + stream.emit(:window, stream.remote_window - @remote_window_limit + v) + end - # Flow control can be disabled the entire connection using the - # SETTINGS_FLOW_CONTROL_OPTIONS setting. This setting ends all forms - # of flow control. An implementation that does not wish to perform - # flow control can use this in the initial SETTINGS exchange. - when :settings_flow_control_options - flow_control_allowed? + @remote_window_limit = v + end - if v == 1 - @window = @window_limit = Float::INFINITY + when :settings_header_table_size + # Setting header table size might cause some headers evicted + case side + when :local + @decompressor.set_table_size(v) + when :remote + @compressor.set_table_size(v) end + + when :settings_enable_push + # nothing to do + + when :settings_max_frame_size + # nothing to do + + else + # ignore unknown settings end end + + case side + when :local + # Received a settings_ack. Notify application layer. + emit(:settings_ack, frame, @pending_settings.size) + when :remote + if @state != :closed + # Send ack to peer + send({type: :settings, stream: 0, payload: [], flags: [:ack]}) + end + end end # Decode headers payload and update connection decompressor state. # # The receiver endpoint reassembles the header block by concatenating @@ -423,39 +569,52 @@ end # Encode headers payload and update connection compressor state. # # @param frame [Hash] + # @return [Array of Frame] def encode_headers(frame) - if !frame[:payload].is_a? String - frame[:payload] = @compressor.encode(frame[:payload]) + payload = frame[:payload] + unless payload.is_a? String + payload = @compressor.encode(payload) end - rescue Exception => e - connection_error(:compression_error, msg: e.message) - end + frames = [] - # Once disabled, no further flow control operations are permitted. - # - def flow_control_allowed? - if @window_limit == Float::INFINITY - connection_error(:flow_control_error) + while payload.size > 0 + cont = frame.dup + cont[:type] = :continuation + cont[:flags] = [] + cont[:payload] = payload.slice!(0, @remote_settings[:settings_max_frame_size]) + frames << cont end + if frames.empty? + frames = [frame] + else + frames.first[:type] = frame[:type] + frames.first[:flags] = frame[:flags] - [:end_headers] + frames.last[:flags] << :end_headers + end + + frames + + rescue Exception => e + [connection_error(:compression_error, msg: e.message)] end # Activates new incoming or outgoing stream and registers appropriate # connection managemet callbacks. # # @param id [Integer] # @param priority [Integer] # @param window [Integer] # @param parent [Stream] - def activate_stream(id, priority, parent = nil) + def activate_stream(id: nil, **args) if @streams.key?(id) connection_error(msg: 'Stream ID already exists') end - stream = Stream.new(id, priority, @window_limit, parent) + stream = Stream.new({connection: self, id: id}.merge(args)) # Streams that are in the "open" state, or either of the "half closed" # states count toward the maximum number of streams that an endpoint is # permitted to open. stream.once(:active) { @active_stream_count += 1 }