lib/http/2/connection.rb in http-2-0.6.3 vs lib/http/2/connection.rb in http-2-0.7.0
- old
+ new
@@ -1,15 +1,40 @@
module HTTP2
# Default connection and stream flow control window (64KB).
DEFAULT_FLOW_WINDOW = 65535
+ # Default header table size
+ DEFAULT_HEADER_SIZE = 4096
+
+ # Default stream_limit
+ DEFAULT_MAX_CONCURRENT_STREAMS = 100
+
+ # Default values for SETTINGS frame, as defined by the spec.
+ SPEC_DEFAULT_CONNECTION_SETTINGS = {
+ settings_header_table_size: 4096,
+ settings_enable_push: 1, # enabled for servers
+ settings_max_concurrent_streams: Framer::MAX_STREAM_ID, # unlimited
+ settings_initial_window_size: 65535,
+ settings_max_frame_size: 16384,
+ settings_max_header_list_size: 2**31 - 1, # unlimited
+ }.freeze
+
+ DEFAULT_CONNECTION_SETTINGS = {
+ settings_header_table_size: 4096,
+ settings_enable_push: 1, # enabled for servers
+ settings_max_concurrent_streams: 100,
+ settings_initial_window_size: 65535, #
+ settings_max_frame_size: 16384,
+ settings_max_header_list_size: 2**31 - 1, # unlimited
+ }.freeze
+
# Default stream priority (lower values are higher priority).
- DEFAULT_PRIORITY = 2**30
+ DEFAULT_WEIGHT = 16
# Default connection "fast-fail" preamble string as defined by the spec.
- CONNECTION_HEADER = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
+ CONNECTION_PREFACE_MAGIC = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
# Connection encapsulates all of the connection, stream, flow-control,
# error management, and other processing logic required for a well-behaved
# HTTP 2.0 endpoint.
#
@@ -26,31 +51,46 @@
# Last connection error if connection is aborted.
attr_reader :error
# Size of current connection flow control window (by default, set to
# infinity, but is automatically updated on receipt of peer settings).
- attr_reader :window
+ attr_reader :local_window
+ attr_reader :remote_window
+ alias :window :local_window
- # Maximum number of concurrent streams allowed by the peer (automatically
- # updated on receipt of peer settings).
- attr_reader :stream_limit
+ # Current settings value for local and peer
+ attr_reader :local_settings
+ attr_reader :remote_settings
+ # Pending settings value
+ # Sent but not ack'ed settings
+ attr_reader :pending_settings
+
# Number of active streams between client and server (reserved streams
# are not counted towards the stream limit).
attr_reader :active_stream_count
# Initializes new connection object.
#
- def initialize(streams: 100, window: DEFAULT_FLOW_WINDOW)
- @stream_limit = streams
+ def initialize(**settings)
+ @local_settings = DEFAULT_CONNECTION_SETTINGS.merge(settings)
+ @remote_settings = SPEC_DEFAULT_CONNECTION_SETTINGS.dup
+
+ @compressor = Header::Compressor.new(settings)
+ @decompressor = Header::Decompressor.new(settings)
+
@active_stream_count = 0
@streams = {}
+ @pending_settings = []
@framer = Framer.new
- @window = window
- @window_limit = window
+ @local_window_limit = @local_settings[:settings_initial_window_size]
+ @local_window = @local_window_limit
+ @remote_window_limit = @remote_settings[:settings_initial_window_size]
+ @remote_window = @remote_window_limit
+
@recv_buffer = Buffer.new
@send_buffer = []
@continuation = []
@error = nil
end
@@ -58,15 +98,15 @@
# Allocates new stream for current connection.
#
# @param priority [Integer]
# @param window [Integer]
# @param parent [Stream]
- def new_stream(priority: DEFAULT_PRIORITY, parent: nil)
+ def new_stream(**args)
raise ConnectionClosed.new if @state == :closed
- raise StreamLimitExceeded.new if @active_stream_count == @stream_limit
+ raise StreamLimitExceeded.new if @active_stream_count >= @remote_settings[:settings_max_concurrent_streams]
- stream = activate_stream(@stream_id, priority, parent)
+ stream = activate_stream(id: @stream_id, **args)
@stream_id += 2
stream
end
@@ -74,11 +114,11 @@
#
# @param payload [String] optional payload must be 8 bytes long
# @param blk [Proc] callback to execute when PONG is received
def ping(payload, &blk)
send({type: :ping, stream: 0, payload: payload})
- once(:pong, &blk) if blk
+ once(:ack, &blk) if blk
end
# Sends a GOAWAY frame indicating that the peer should stop creating
# new streams for current connection.
#
@@ -95,24 +135,21 @@
error: error, payload: payload
})
@state = :closed
end
- # Sends a connection SETTINGS frame to the peer. Setting window size
- # to Float::INFINITY disables flow control.
+ # Sends a connection SETTINGS frame to the peer.
+ # The values are reflected when the corresponding ACK is received.
#
- # @param stream_limit [Integer] maximum number of concurrent streams
- # @param window_limit [Float] maximum flow window size
- def settings(stream_limit: @stream_limit, window_limit: @window_limit)
- payload = { settings_max_concurrent_streams: stream_limit }
- if window_limit.to_f.infinite?
- payload[:settings_flow_control_options] = 1
- else
- payload[:settings_initial_window_size] = window_limit
- end
-
+ # @param settings [Array or Hash]
+ def settings(payload)
+ payload = payload.to_a
+ check = validate_settings(@local_role, payload)
+ check and connection_error
+ @pending_settings << payload
send({type: :settings, stream: 0, payload: payload})
+ @pending_settings << payload
end
# Decodes incoming bytes into HTTP 2.0 frames and routes them to
# appropriate receivers: connection frames are handled directly, and
# stream frames are passed to appropriate stream objects.
@@ -126,27 +163,31 @@
# connection header as a final confirmation and to establish the
# initial settings for the HTTP/2.0 connection.
#
# Client connection header is 24 byte connection header followed by
# SETTINGS frame. Server connection header is SETTINGS frame only.
- if @state == :new
+ if @state == :waiting_magic
if @recv_buffer.size < 24
- if !CONNECTION_HEADER.start_with? @recv_buffer
+ if !CONNECTION_PREFACE_MAGIC.start_with? @recv_buffer
raise HandshakeError.new
else
- return
+ return # maybe next time
end
- elsif @recv_buffer.read(24) != CONNECTION_HEADER
+ elsif @recv_buffer.read(24) != CONNECTION_PREFACE_MAGIC
raise HandshakeError.new
else
- @state = :connection_header
- settings(stream_limit: @stream_limit, window_limit: @window_limit)
+ # MAGIC is OK. Send our settings
+ @state = :waiting_connection_preface
+ payload = @local_settings.select {|k,v| v != SPEC_DEFAULT_CONNECTION_SETTINGS[k]}
+ settings(payload)
end
end
while frame = @framer.parse(@recv_buffer) do
+ emit(:frame_received, frame)
+
# Header blocks MUST be transmitted as a contiguous sequence of frames
# with no interleaved frames of any other type, or from any other stream.
if !@continuation.empty?
if frame[:type] != :continuation ||
frame[:stream] != @continuation.first[:stream]
@@ -154,25 +195,18 @@
end
@continuation << frame
return if !frame[:flags].include? :end_headers
- headers = @continuation.collect do |chunk|
- decode_headers(chunk)
- chunk[:payload]
- end.flatten(1)
+ payload = @continuation.map {|f| f[:payload]}.join
frame = @continuation.shift
@continuation.clear
frame.delete(:length)
- frame[:payload] = headers
- frame[:flags] << if frame[:type] == :push_promise
- :end_push_promise
- else
- :end_headers
- end
+ frame[:payload] = Buffer.new(payload)
+ frame[:flags] << :end_headers
end
# SETTINGS frames always apply to a connection, never a single stream.
# The stream identifier for a settings frame MUST be zero. If an
# endpoint receives a SETTINGS frame whose stream identifier field is
@@ -198,21 +232,23 @@
decode_headers(frame)
return if @state == :closed
stream = @streams[frame[:stream]]
if stream.nil?
- stream = activate_stream(frame[:stream],
- frame[:priority] || DEFAULT_PRIORITY)
+ stream = activate_stream(id: frame[:stream],
+ weight: frame[:weight] || DEFAULT_WEIGHT,
+ dependency: frame[:dependency] || 0,
+ exclusive: frame[:exclusive] || false)
emit(:stream, stream)
end
stream << frame
when :push_promise
# The last frame in a sequence of PUSH_PROMISE/CONTINUATION
- # frames MUST have the END_PUSH_PROMISE/END_HEADERS flag set
- if !frame[:flags].include? :end_push_promise
+ # frames MUST have the END_HEADERS flag set
+ if !frame[:flags].include? :end_headers
@continuation << frame
return
end
decode_headers(frame)
@@ -246,11 +282,11 @@
else
connection_error
end
end
- stream = activate_stream(pid, DEFAULT_PRIORITY, parent)
+ stream = activate_stream(id: pid, parent: parent)
emit(:promise, stream)
stream << frame
else
if stream = @streams[frame[:stream]]
stream << frame
@@ -261,11 +297,11 @@
end
end
end
end
- rescue
+ rescue => e
connection_error
end
alias :<< :receive
private
@@ -275,10 +311,11 @@
# All other frames are sent immediately.
#
# @note all frames are currently delivered in FIFO order.
# @param frame [Hash]
def send(frame)
+ emit(:frame_sent, frame)
if frame[:type] == :data
send_data(frame, true)
else
# An endpoint can end a connection at any time. In particular, an
@@ -286,26 +323,32 @@
if frame[:type] == :rst_stream
if frame[:error] == :protocol_error
goaway(frame[:error])
end
else
- emit(:frame, encode(frame))
+ # HEADERS and PUSH_PROMISE may generate CONTINUATION
+ frames = encode(frame)
+ frames.each {|f| emit(:frame, f) }
end
end
end
# Applies HTTP 2.0 binary encoding to the frame.
#
# @param frame [Hash]
- # @return [Buffer] encoded frame
+ # @return [Array of Buffer] encoded frame
def encode(frame)
+ frames = []
+
if frame[:type] == :headers ||
frame[:type] == :push_promise
- encode_headers(frame)
+ frames = encode_headers(frame) # HEADERS and PUSH_PROMISE may create more than one frame
+ else
+ frames = [frame] # otherwise one frame
end
- @framer.generate(frame)
+ frames.map {|f| @framer.generate(f) }
end
# Check if frame is a connection frame: SETTINGS, PING, GOAWAY, and any
# frame addressed to stream ID = 0.
#
@@ -325,86 +368,189 @@
# - Mark connection as closed on GOAWAY
#
# @param frame [Hash]
def connection_management(frame)
case @state
- when :connection_header
- # SETTINGS frames MUST be sent at the start of a connection.
- connection_settings(frame)
+ when :waiting_connection_preface
+ # The first frame MUST be a SETTINGS frame at the start of a connection.
@state = :connected
+ connection_settings(frame)
when :connected
case frame[:type]
when :settings
connection_settings(frame)
when :window_update
- flow_control_allowed?
- @window += frame[:increment]
+ @remote_window += frame[:increment]
send_data(nil, true)
when :ping
- if frame[:flags].include? :pong
- emit(:pong, frame[:payload])
+ if frame[:flags].include? :ack
+ emit(:ack, frame[:payload])
else
send({
type: :ping, stream: 0,
- flags: [:pong], payload: frame[:payload]
+ flags: [:ack], payload: frame[:payload]
})
end
when :goaway
# Receivers of a GOAWAY frame MUST NOT open additional streams on
# the connection, although a new connection can be established
# for new streams.
@state = :closed
emit(:goaway, frame[:last_stream], frame[:error], frame[:payload])
-
+ when :altsvc, :blocked
+ emit(frame[:type], frame)
else
connection_error
end
else
connection_error
end
end
- # Update local connection settings based on parameters set by the peer.
+ # Validate settings parameters. See sepc Section 6.5.2.
#
+ # @param role [Symbol] The sender's role: :client or :server
+ # @return nil if no error. Exception object in case of any error.
+ def validate_settings(role, settings)
+ settings.each do |key,v|
+ case key
+ when :settings_header_table_size
+ # Any value is valid
+ when :settings_enable_push
+ case role
+ when :server
+ # Section 8.2
+ # Clients MUST reject any attempt to change the
+ # SETTINGS_ENABLE_PUSH setting to a value other than 0 by treating the
+ # message as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
+ unless v == 0
+ return ProtocolError.new("invalid #{key} value")
+ end
+ when :client
+ # Any value other than 0 or 1 MUST be treated as a
+ # connection error (Section 5.4.1) of type PROTOCOL_ERROR.
+ unless v == 0 || v == 1
+ return ProtocolError.new("invalid #{key} value")
+ end
+ end
+ when :settings_max_concurrent_streams
+ # Any value is valid
+ when :settings_initial_window_size
+ # Values above the maximum flow control window size of 2^31-1 MUST
+ # be treated as a connection error (Section 5.4.1) of type
+ # FLOW_CONTROL_ERROR.
+ unless v <= 0x7fffffff
+ return FlowControlError.new("invalid #{key} value")
+ end
+ when :settings_max_frame_size
+ # The initial value is 2^14 (16,384) octets. The value advertised
+ # by an endpoint MUST be between this initial value and the maximum
+ # allowed frame size (2^24-1 or 16,777,215 octets), inclusive.
+ # Values outside this range MUST be treated as a connection error
+ # (Section 5.4.1) of type PROTOCOL_ERROR.
+ unless 16384 <= v && v <= 16777215
+ return ProtocolError.new("invalid #{key} value")
+ end
+ when :settings_max_header_list_size
+ # Any value is valid
+ else
+ # ignore unknown settings
+ end
+ end
+ nil
+ end
+
+ # Update connection settings based on parameters set by the peer.
+ #
# @param frame [Hash]
def connection_settings(frame)
if (frame[:type] != :settings || frame[:stream] != 0)
connection_error
end
- frame[:payload].each do |key,v|
+ # Apply settings.
+ # side =
+ # local: previously sent and pended our settings should be effective
+ # remote: just received peer settings should immediately be effective
+ settings, side = \
+ if frame[:flags].include?(:ack)
+ # Process pending settings we have sent.
+ [@pending_settings.shift, :local]
+ else
+ check = validate_settings(@remote_role, frame[:payload])
+ check and connection_error(check)
+ [frame[:payload], :remote]
+ end
+
+ settings.each do |key,v|
+ case side
+ when :local
+ @local_settings[key] = v
+ when :remote
+ @remote_settings[key] = v
+ end
+
case key
when :settings_max_concurrent_streams
- @stream_limit = v
+ # Do nothing.
+ # The value controls at the next attempt of stream creation.
- # A change to SETTINGS_INITIAL_WINDOW_SIZE could cause the available
- # space in a flow control window to become negative. A sender MUST
- # track the negative flow control window, and MUST NOT send new flow
- # controlled frames until it receives WINDOW_UPDATE frames that cause
- # the flow control window to become positive.
when :settings_initial_window_size
- flow_control_allowed?
- @window = @window - @window_limit + v
- @streams.each do |id, stream|
- stream.emit(:window, stream.window - @window_limit + v)
- end
+ # A change to SETTINGS_INITIAL_WINDOW_SIZE could cause the available
+ # space in a flow control window to become negative. A sender MUST
+ # track the negative flow control window, and MUST NOT send new flow
+ # controlled frames until it receives WINDOW_UPDATE frames that cause
+ # the flow control window to become positive.
+ case side
+ when :local
+ @local_window = @local_window - @local_window_limit + v
+ @streams.each do |id, stream|
+ stream.emit(:local_window, stream.local_window - @local_window_limit + v)
+ end
- @window_limit = v
+ @local_window_limit = v
+ when :remote
+ @remote_window = @remote_window - @remote_window_limit + v
+ @streams.each do |id, stream|
+ # Event name is :window, not :remote_window
+ stream.emit(:window, stream.remote_window - @remote_window_limit + v)
+ end
- # Flow control can be disabled the entire connection using the
- # SETTINGS_FLOW_CONTROL_OPTIONS setting. This setting ends all forms
- # of flow control. An implementation that does not wish to perform
- # flow control can use this in the initial SETTINGS exchange.
- when :settings_flow_control_options
- flow_control_allowed?
+ @remote_window_limit = v
+ end
- if v == 1
- @window = @window_limit = Float::INFINITY
+ when :settings_header_table_size
+ # Setting header table size might cause some headers evicted
+ case side
+ when :local
+ @decompressor.set_table_size(v)
+ when :remote
+ @compressor.set_table_size(v)
end
+
+ when :settings_enable_push
+ # nothing to do
+
+ when :settings_max_frame_size
+ # nothing to do
+
+ else
+ # ignore unknown settings
end
end
+
+ case side
+ when :local
+ # Received a settings_ack. Notify application layer.
+ emit(:settings_ack, frame, @pending_settings.size)
+ when :remote
+ if @state != :closed
+ # Send ack to peer
+ send({type: :settings, stream: 0, payload: [], flags: [:ack]})
+ end
+ end
end
# Decode headers payload and update connection decompressor state.
#
# The receiver endpoint reassembles the header block by concatenating
@@ -423,39 +569,52 @@
end
# Encode headers payload and update connection compressor state.
#
# @param frame [Hash]
+ # @return [Array of Frame]
def encode_headers(frame)
- if !frame[:payload].is_a? String
- frame[:payload] = @compressor.encode(frame[:payload])
+ payload = frame[:payload]
+ unless payload.is_a? String
+ payload = @compressor.encode(payload)
end
- rescue Exception => e
- connection_error(:compression_error, msg: e.message)
- end
+ frames = []
- # Once disabled, no further flow control operations are permitted.
- #
- def flow_control_allowed?
- if @window_limit == Float::INFINITY
- connection_error(:flow_control_error)
+ while payload.size > 0
+ cont = frame.dup
+ cont[:type] = :continuation
+ cont[:flags] = []
+ cont[:payload] = payload.slice!(0, @remote_settings[:settings_max_frame_size])
+ frames << cont
end
+ if frames.empty?
+ frames = [frame]
+ else
+ frames.first[:type] = frame[:type]
+ frames.first[:flags] = frame[:flags] - [:end_headers]
+ frames.last[:flags] << :end_headers
+ end
+
+ frames
+
+ rescue Exception => e
+ [connection_error(:compression_error, msg: e.message)]
end
# Activates new incoming or outgoing stream and registers appropriate
# connection managemet callbacks.
#
# @param id [Integer]
# @param priority [Integer]
# @param window [Integer]
# @param parent [Stream]
- def activate_stream(id, priority, parent = nil)
+ def activate_stream(id: nil, **args)
if @streams.key?(id)
connection_error(msg: 'Stream ID already exists')
end
- stream = Stream.new(id, priority, @window_limit, parent)
+ stream = Stream.new({connection: self, id: id}.merge(args))
# Streams that are in the "open" state, or either of the "half closed"
# states count toward the maximum number of streams that an endpoint is
# permitted to open.
stream.once(:active) { @active_stream_count += 1 }