lib/http/2/connection.rb in http-2-0.7.0 vs lib/http/2/connection.rb in http-2-0.8.0
- old
+ new
@@ -1,9 +1,8 @@
module HTTP2
-
# Default connection and stream flow control window (64KB).
- DEFAULT_FLOW_WINDOW = 65535
+ DEFAULT_FLOW_WINDOW = 65_535
# Default header table size
DEFAULT_HEADER_SIZE = 4096
# Default stream_limit
@@ -12,36 +11,38 @@
# Default values for SETTINGS frame, as defined by the spec.
SPEC_DEFAULT_CONNECTION_SETTINGS = {
settings_header_table_size: 4096,
settings_enable_push: 1, # enabled for servers
settings_max_concurrent_streams: Framer::MAX_STREAM_ID, # unlimited
- settings_initial_window_size: 65535,
- settings_max_frame_size: 16384,
+ settings_initial_window_size: 65_535,
+ settings_max_frame_size: 16_384,
settings_max_header_list_size: 2**31 - 1, # unlimited
}.freeze
DEFAULT_CONNECTION_SETTINGS = {
settings_header_table_size: 4096,
settings_enable_push: 1, # enabled for servers
settings_max_concurrent_streams: 100,
- settings_initial_window_size: 65535, #
- settings_max_frame_size: 16384,
+ settings_initial_window_size: 65_535, #
+ settings_max_frame_size: 16_384,
settings_max_header_list_size: 2**31 - 1, # unlimited
}.freeze
# Default stream priority (lower values are higher priority).
DEFAULT_WEIGHT = 16
# Default connection "fast-fail" preamble string as defined by the spec.
- CONNECTION_PREFACE_MAGIC = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
+ CONNECTION_PREFACE_MAGIC = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".freeze
# Connection encapsulates all of the connection, stream, flow-control,
# error management, and other processing logic required for a well-behaved
# HTTP 2.0 endpoint.
#
# Note that this class should not be used directly. Instead, you want to
# use either Client or Server class to drive the HTTP 2.0 exchange.
+ #
+ # rubocop:disable ClassLength
class Connection
include FlowBuffer
include Emitter
include Error
@@ -53,11 +54,11 @@
# Size of current connection flow control window (by default, set to
# infinity, but is automatically updated on receipt of peer settings).
attr_reader :local_window
attr_reader :remote_window
- alias :window :local_window
+ alias_method :window, :local_window
# Current settings value for local and peer
attr_reader :local_settings
attr_reader :remote_settings
@@ -99,12 +100,12 @@
#
# @param priority [Integer]
# @param window [Integer]
# @param parent [Stream]
def new_stream(**args)
- raise ConnectionClosed.new if @state == :closed
- raise StreamLimitExceeded.new if @active_stream_count >= @remote_settings[:settings_max_concurrent_streams]
+ fail ConnectionClosed if @state == :closed
+ fail StreamLimitExceeded if @active_stream_count >= @remote_settings[:settings_max_concurrent_streams]
stream = activate_stream(id: @stream_id, **args)
@stream_id += 2
stream
@@ -113,11 +114,11 @@
# Sends PING frame to the peer.
#
# @param payload [String] optional payload must be 8 bytes long
# @param blk [Proc] callback to execute when PONG is received
def ping(payload, &blk)
- send({type: :ping, stream: 0, payload: payload})
+ send(type: :ping, stream: 0, payload: payload)
once(:ack, &blk) if blk
end
# Sends a GOAWAY frame indicating that the peer should stop creating
# new streams for current connection.
@@ -128,27 +129,30 @@
# since it could contain sensitive information.
#
# @param error [Symbol]
# @param payload [String]
def goaway(error = :no_error, payload = nil)
- send({
- type: :goaway, last_stream: (@streams.max.first rescue 0),
- error: error, payload: payload
- })
+ last_stream = if (max = @streams.max)
+ max.first
+ else
+ 0
+ end
+
+ send(type: :goaway, last_stream: last_stream,
+ error: error, payload: payload)
@state = :closed
end
# Sends a connection SETTINGS frame to the peer.
# The values are reflected when the corresponding ACK is received.
#
# @param settings [Array or Hash]
def settings(payload)
payload = payload.to_a
- check = validate_settings(@local_role, payload)
- check and connection_error
+ connection_error if validate_settings(@local_role, payload)
@pending_settings << payload
- send({type: :settings, stream: 0, payload: payload})
+ send(type: :settings, stream: 0, payload: payload)
@pending_settings << payload
end
# Decodes incoming bytes into HTTP 2.0 frames and routes them to
# appropriate receivers: connection frames are handled directly, and
@@ -166,40 +170,38 @@
# Client connection header is 24 byte connection header followed by
# SETTINGS frame. Server connection header is SETTINGS frame only.
if @state == :waiting_magic
if @recv_buffer.size < 24
if !CONNECTION_PREFACE_MAGIC.start_with? @recv_buffer
- raise HandshakeError.new
+ fail HandshakeError
else
return # maybe next time
end
-
- elsif @recv_buffer.read(24) != CONNECTION_PREFACE_MAGIC
- raise HandshakeError.new
- else
+ elsif @recv_buffer.read(24) == CONNECTION_PREFACE_MAGIC
# MAGIC is OK. Send our settings
@state = :waiting_connection_preface
- payload = @local_settings.select {|k,v| v != SPEC_DEFAULT_CONNECTION_SETTINGS[k]}
+ payload = @local_settings.reject { |k, v| v == SPEC_DEFAULT_CONNECTION_SETTINGS[k] }
settings(payload)
+ else
+ fail HandshakeError
end
end
- while frame = @framer.parse(@recv_buffer) do
+ while (frame = @framer.parse(@recv_buffer))
emit(:frame_received, frame)
# Header blocks MUST be transmitted as a contiguous sequence of frames
# with no interleaved frames of any other type, or from any other stream.
- if !@continuation.empty?
- if frame[:type] != :continuation ||
- frame[:stream] != @continuation.first[:stream]
+ unless @continuation.empty?
+ unless frame[:type] == :continuation && frame[:stream] == @continuation.first[:stream]
connection_error
end
@continuation << frame
- return if !frame[:flags].include? :end_headers
+ return unless frame[:flags].include? :end_headers
- payload = @continuation.map {|f| f[:payload]}.join
+ payload = @continuation.map { |f| f[:payload] }.join
frame = @continuation.shift
@continuation.clear
frame.delete(:length)
@@ -217,11 +219,11 @@
else
case frame[:type]
when :headers
# The last frame in a sequence of HEADERS/CONTINUATION
# frames MUST have the END_HEADERS flag set.
- if !frame[:flags].include? :end_headers
+ unless frame[:flags].include? :end_headers
@continuation << frame
return
end
# After sending a GOAWAY frame, the sender can discard frames
@@ -232,23 +234,25 @@
decode_headers(frame)
return if @state == :closed
stream = @streams[frame[:stream]]
if stream.nil?
- stream = activate_stream(id: frame[:stream],
- weight: frame[:weight] || DEFAULT_WEIGHT,
- dependency: frame[:dependency] || 0,
- exclusive: frame[:exclusive] || false)
+ stream = activate_stream(
+ id: frame[:stream],
+ weight: frame[:weight] || DEFAULT_WEIGHT,
+ dependency: frame[:dependency] || 0,
+ exclusive: frame[:exclusive] || false,
+ )
emit(:stream, stream)
end
stream << frame
when :push_promise
# The last frame in a sequence of PUSH_PROMISE/CONTINUATION
# frames MUST have the END_HEADERS flag set
- if !frame[:flags].include? :end_headers
+ unless frame[:flags].include? :end_headers
@continuation << frame
return
end
decode_headers(frame)
@@ -267,44 +271,61 @@
parent = @streams[frame[:stream]]
pid = frame[:promise_stream]
connection_error(msg: 'missing parent ID') if parent.nil?
- if !(parent.state == :open || parent.state == :half_closed_local)
+ unless parent.state == :open || parent.state == :half_closed_local
# An endpoint might receive a PUSH_PROMISE frame after it sends
# RST_STREAM. PUSH_PROMISE causes a stream to become "reserved".
# The RST_STREAM does not cancel any promised stream. Therefore, if
# promised streams are not desired, a RST_STREAM can be used to
# close any of those streams.
if parent.closed == :local_rst
# We can either (a) 'resurrect' the parent, or (b) RST_STREAM
# ... sticking with (b), might need to revisit later.
- send({type: :rst_stream, stream: pid, error: :refused_stream})
+ send(type: :rst_stream, stream: pid, error: :refused_stream)
else
connection_error
end
end
stream = activate_stream(id: pid, parent: parent)
emit(:promise, stream)
stream << frame
else
- if stream = @streams[frame[:stream]]
+ if (stream = @streams[frame[:stream]])
stream << frame
else
- # An endpoint that receives an unexpected stream identifier
- # MUST respond with a connection error of type PROTOCOL_ERROR.
- connection_error
+ # The PRIORITY frame can be sent for a stream in the "idle" or
+ # "closed" state. This allows for the reprioritization of a
+ # group of dependent streams by altering the priority of an
+ # unused or closed parent stream.
+ if frame[:type] == :priority
+ stream = activate_stream(
+ id: frame[:stream],
+ weight: frame[:weight] || DEFAULT_WEIGHT,
+ dependency: frame[:dependency] || 0,
+ exclusive: frame[:exclusive] || false,
+ )
+
+ emit(:stream, stream)
+ stream << frame
+ else
+ # An endpoint that receives an unexpected stream identifier
+ # MUST respond with a connection error of type PROTOCOL_ERROR.
+ connection_error
+ end
end
end
end
end
rescue => e
+ raise if e.is_a?(Error::Error)
connection_error
end
- alias :<< :receive
+ alias_method :<<, :receive
private
# Send an outgoing frame. DATA frames are subject to connection flow
# control and may be split and / or buffered based on current window size.
@@ -318,18 +339,17 @@
send_data(frame, true)
else
# An endpoint can end a connection at any time. In particular, an
# endpoint MAY choose to treat a stream error as a connection error.
- if frame[:type] == :rst_stream
- if frame[:error] == :protocol_error
- goaway(frame[:error])
- end
+ if frame[:type] == :rst_stream && frame[:error] == :protocol_error
+ goaway(frame[:error])
else
- # HEADERS and PUSH_PROMISE may generate CONTINUATION
+ # HEADERS and PUSH_PROMISE may generate CONTINUATION. Also send
+ # RST_STREAM that are not protocol errors
frames = encode(frame)
- frames.each {|f| emit(:frame, f) }
+ frames.each { |f| emit(:frame, f) }
end
end
end
# Applies HTTP 2.0 binary encoding to the frame.
@@ -337,30 +357,29 @@
# @param frame [Hash]
# @return [Array of Buffer] encoded frame
def encode(frame)
frames = []
- if frame[:type] == :headers ||
- frame[:type] == :push_promise
+ if frame[:type] == :headers || frame[:type] == :push_promise
frames = encode_headers(frame) # HEADERS and PUSH_PROMISE may create more than one frame
else
frames = [frame] # otherwise one frame
end
- frames.map {|f| @framer.generate(f) }
+ frames.map { |f| @framer.generate(f) }
end
# Check if frame is a connection frame: SETTINGS, PING, GOAWAY, and any
# frame addressed to stream ID = 0.
#
# @param frame [Hash]
# @return [Boolean]
def connection_frame?(frame)
frame[:stream] == 0 ||
- frame[:type] == :settings ||
- frame[:type] == :ping ||
- frame[:type] == :goaway
+ frame[:type] == :settings ||
+ frame[:type] == :ping ||
+ frame[:type] == :goaway
end
# Process received connection frame (stream ID = 0).
# - Handle SETTINGS updates
# - Connection flow control (WINDOW_UPDATE)
@@ -384,14 +403,12 @@
send_data(nil, true)
when :ping
if frame[:flags].include? :ack
emit(:ack, frame[:payload])
else
- send({
- type: :ping, stream: 0,
- flags: [:ack], payload: frame[:payload]
- })
+ send(type: :ping, stream: 0,
+ flags: [:ack], payload: frame[:payload])
end
when :goaway
# Receivers of a GOAWAY frame MUST NOT open additional streams on
# the connection, although a new connection can be established
# for new streams.
@@ -410,24 +427,22 @@
# Validate settings parameters. See sepc Section 6.5.2.
#
# @param role [Symbol] The sender's role: :client or :server
# @return nil if no error. Exception object in case of any error.
def validate_settings(role, settings)
- settings.each do |key,v|
+ settings.each do |key, v|
case key
when :settings_header_table_size
# Any value is valid
when :settings_enable_push
case role
when :server
# Section 8.2
# Clients MUST reject any attempt to change the
# SETTINGS_ENABLE_PUSH setting to a value other than 0 by treating the
# message as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
- unless v == 0
- return ProtocolError.new("invalid #{key} value")
- end
+ return ProtocolError.new("invalid #{key} value") unless v == 0
when :client
# Any value other than 0 or 1 MUST be treated as a
# connection error (Section 5.4.1) of type PROTOCOL_ERROR.
unless v == 0 || v == 1
return ProtocolError.new("invalid #{key} value")
@@ -446,45 +461,40 @@
# The initial value is 2^14 (16,384) octets. The value advertised
# by an endpoint MUST be between this initial value and the maximum
# allowed frame size (2^24-1 or 16,777,215 octets), inclusive.
# Values outside this range MUST be treated as a connection error
# (Section 5.4.1) of type PROTOCOL_ERROR.
- unless 16384 <= v && v <= 16777215
+ unless 16_384 <= v && v <= 16_777_215
return ProtocolError.new("invalid #{key} value")
end
when :settings_max_header_list_size
# Any value is valid
- else
- # ignore unknown settings
+ # else # ignore unknown settings
end
end
nil
end
# Update connection settings based on parameters set by the peer.
#
# @param frame [Hash]
def connection_settings(frame)
- if (frame[:type] != :settings || frame[:stream] != 0)
- connection_error
- end
+ connection_error unless frame[:type] == :settings && frame[:stream] == 0
# Apply settings.
# side =
# local: previously sent and pended our settings should be effective
# remote: just received peer settings should immediately be effective
- settings, side = \
- if frame[:flags].include?(:ack)
- # Process pending settings we have sent.
- [@pending_settings.shift, :local]
- else
- check = validate_settings(@remote_role, frame[:payload])
- check and connection_error(check)
- [frame[:payload], :remote]
- end
+ settings, side = if frame[:flags].include?(:ack)
+ # Process pending settings we have sent.
+ [@pending_settings.shift, :local]
+ else
+ connection_error(check) if validate_settings(@remote_role, frame[:payload])
+ [frame[:payload], :remote]
+ end
- settings.each do |key,v|
+ settings.each do |key, v|
case side
when :local
@local_settings[key] = v
when :remote
@remote_settings[key] = v
@@ -502,18 +512,18 @@
# controlled frames until it receives WINDOW_UPDATE frames that cause
# the flow control window to become positive.
case side
when :local
@local_window = @local_window - @local_window_limit + v
- @streams.each do |id, stream|
+ @streams.each do |_id, stream|
stream.emit(:local_window, stream.local_window - @local_window_limit + v)
end
@local_window_limit = v
when :remote
@remote_window = @remote_window - @remote_window_limit + v
- @streams.each do |id, stream|
+ @streams.each do |_id, stream|
# Event name is :window, not :remote_window
stream.emit(:window, stream.remote_window - @remote_window_limit + v)
end
@remote_window_limit = v
@@ -521,34 +531,33 @@
when :settings_header_table_size
# Setting header table size might cause some headers evicted
case side
when :local
- @decompressor.set_table_size(v)
+ @decompressor.table_size = v
when :remote
- @compressor.set_table_size(v)
+ @compressor.table_size = v
end
when :settings_enable_push
# nothing to do
when :settings_max_frame_size
# nothing to do
- else
- # ignore unknown settings
+ # else # ignore unknown settings
end
end
case side
when :local
# Received a settings_ack. Notify application layer.
emit(:settings_ack, frame, @pending_settings.size)
when :remote
- if @state != :closed
+ unless @state == :closed || @h2c_upgrade == :start
# Send ack to peer
- send({type: :settings, stream: 0, payload: [], flags: [:ack]})
+ send(type: :settings, stream: 0, payload: [], flags: [:ack])
end
end
end
# Decode headers payload and update connection decompressor state.
@@ -562,23 +571,21 @@
def decode_headers(frame)
if frame[:payload].is_a? String
frame[:payload] = @decompressor.decode(frame[:payload])
end
- rescue Exception => e
+ rescue => e
connection_error(:compression_error, msg: e.message)
end
# Encode headers payload and update connection compressor state.
#
# @param frame [Hash]
# @return [Array of Frame]
def encode_headers(frame)
payload = frame[:payload]
- unless payload.is_a? String
- payload = @compressor.encode(payload)
- end
+ payload = @compressor.encode(payload) unless payload.is_a? String
frames = []
while payload.size > 0
cont = frame.dup
@@ -590,16 +597,16 @@
if frames.empty?
frames = [frame]
else
frames.first[:type] = frame[:type]
frames.first[:flags] = frame[:flags] - [:end_headers]
- frames.last[:flags] << :end_headers
+ frames.last[:flags] << :end_headers
end
frames
- rescue Exception => e
+ rescue => e
[connection_error(:compression_error, msg: e.message)]
end
# Activates new incoming or outgoing stream and registers appropriate
# connection managemet callbacks.
@@ -607,15 +614,13 @@
# @param id [Integer]
# @param priority [Integer]
# @param window [Integer]
# @param parent [Stream]
def activate_stream(id: nil, **args)
- if @streams.key?(id)
- connection_error(msg: 'Stream ID already exists')
- end
+ connection_error(msg: 'Stream ID already exists') if @streams.key?(id)
- stream = Stream.new({connection: self, id: id}.merge(args))
+ stream = Stream.new({ connection: self, id: id }.merge(args))
# Streams that are in the "open" state, or either of the "half closed"
# states count toward the maximum number of streams that an endpoint is
# permitted to open.
stream.once(:active) { @active_stream_count += 1 }
@@ -636,14 +641,13 @@
# @option error [Symbol] :stream_closed
# @option error [Symbol] :frame_too_large
# @option error [Symbol] :compression_error
# @param msg [String]
def connection_error(error = :protocol_error, msg: nil)
- goaway(error) if @state != :closed && @state != :new
+ goaway(error) unless @state == :closed || @state == :new
@state, @error = :closed, error
klass = error.to_s.split('_').map(&:capitalize).join
- raise Error.const_get(klass).new(msg)
+ fail Error.const_get(klass), msg
end
-
end
end