lib/http/2/stream.rb in http-2-0.6.3 vs lib/http/2/stream.rb in http-2-0.7.0
- old
+ new
@@ -22,11 +22,11 @@
# | +----------+ +--------+ +----------+ |
# | | :active | | :active | |
# | | ,-------|:active |-------. | |
# | | H / ES | | ES \ H | |
# | v v +--------+ v v |
- # | +-----------+ | +-_---------+ |
+ # | +-----------+ | +-----------+ |
# | |:half_close| | |:half_close| |
# | | (remote) | | | (local) | |
# | +-----------+ | +-----------+ |
# | | v | |
# | | ES/R +--------+ ES/R | |
@@ -47,14 +47,17 @@
# Request parent stream of push stream.
attr_reader :parent
# Stream priority as set by initiator.
- attr_reader :priority
+ attr_reader :weight
+ attr_reader :dependency
# Size of current stream flow control window.
- attr_reader :window
+ attr_reader :local_window
+ attr_reader :remote_window
+ alias :window :local_window
# Reason why connection was closed.
attr_reader :closed
# Initializes new stream.
@@ -62,47 +65,51 @@
# Note that you should never have to call this directly. To create a new
# client initiated stream, use Connection#new_stream. Similarly, Connection
# will emit new stream objects, when new stream frames are received.
#
# @param id [Integer]
- # @param priority [Integer]
+ # @param weight [Integer]
+ # @param dependency [Integer]
+ # @param exclusive [Boolean]
# @param window [Integer]
# @param parent [Stream]
- def initialize(id, priority, window, parent = nil)
- @id = id
- @priority = priority
- @window = window
+ def initialize(connection: nil, id: nil, weight: 16, dependency: 0, exclusive: false, parent: nil)
+ @connection = connection or raise ArgumentError.new("missing mandatory argument connection")
+ @id = id or raise ArgumentError.new("missing mandatory argument id")
+ @weight = weight
+ @dependency = dependency
+ process_priority({weight: weight, stream_dependency: dependency, exclusive: exclusive})
+ @remote_window = connection.remote_settings[:settings_initial_window_size]
@parent = parent
@state = :idle
@error = false
@closed = false
@send_buffer = []
- on(:window) { |v| @window = v }
+ on(:window) { |v| @remote_window = v }
+ on(:local_window) { |v| @local_window = v }
end
# Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.
#
# @param frame [Hash]
def receive(frame)
transition(frame, false)
case frame[:type]
when :data
+ # TODO: when receiving DATA, keep track of local_window.
emit(:data, frame[:payload]) if !frame[:ignore]
when :headers, :push_promise
- if frame[:payload].is_a? Array
- emit(:headers, Hash[*frame[:payload].flatten]) if !frame[:ignore]
- else
- emit(:headers, frame[:payload]) if !frame[:ignore]
- end
+ emit(:headers, frame[:payload]) if !frame[:ignore]
when :priority
- @priority = frame[:priority]
- emit(:priority, @priority)
+ process_priority(frame)
when :window_update
- @window += frame[:increment]
+ @remote_window += frame[:increment]
send_data
+ when :altsvc, :blocked
+ emit(frame[:type], frame)
end
complete_transition(frame)
end
alias :<< :receive
@@ -114,11 +121,11 @@
# @param frame [Hash]
def send(frame)
transition(frame, true)
frame[:stream] ||= @id
- @priority = frame[:priority] if frame[:type] == :priority
+ process_priority(frame) if frame[:type] == :priority
if frame[:type] == :data
send_data(frame)
else
emit(:frame, frame)
@@ -127,47 +134,51 @@
complete_transition(frame)
end
# Sends a HEADERS frame containing HTTP response headers.
#
- # @param headers [Hash]
+ # @param headers [Array or Hash] Array of key-value pairs or Hash
# @param end_headers [Boolean] indicates that no more headers will be sent
# @param end_stream [Boolean] indicates that no payload will be sent
def headers(headers, end_headers: true, end_stream: false)
flags = []
flags << :end_headers if end_headers
flags << :end_stream if end_stream
send({type: :headers, flags: flags, payload: headers.to_a})
end
- def promise(headers, end_push_promise: true, &block)
+ def promise(headers, end_headers: true, &block)
raise Exception.new("must provide callback") if !block_given?
- flags = end_push_promise ? [:end_push_promise] : []
+ flags = end_headers ? [:end_headers] : []
emit(:promise, self, headers, flags, &block)
end
# Sends a PRIORITY frame with new stream priority value (can only be
# performed by the client).
#
- # @param p [Integer] new stream priority value
- def reprioritize(p)
+ # @param weight [Integer] new stream weight value
+ # @param dependency [Integer] new stream dependency stream
+ def reprioritize(weight: 16, dependency: 0, exclusive: false)
stream_error if @id.even?
- send({type: :priority, priority: p})
+ send({type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive})
end
# Sends DATA frame containing response payload.
#
# @param payload [String]
# @param end_stream [Boolean] indicates last response DATA frame
def data(payload, end_stream: true)
flags = []
flags << :end_stream if end_stream
- while payload.bytesize > MAX_FRAME_SIZE do
- chunk = payload.slice!(0, MAX_FRAME_SIZE)
+ # Split data according to each frame is smaller enough
+ # TODO: consider padding?
+ max_size = @connection.remote_settings[:settings_max_frame_size]
+ while payload.bytesize > max_size do
+ chunk = payload.slice!(0, max_size)
send({type: :data, payload: chunk})
end
send({type: :data, flags: flags, payload: payload})
end
@@ -342,22 +353,27 @@
# A receiver can ignore WINDOW_UPDATE or PRIORITY frames in this
# state. These frame types might arrive for a short period after a
# frame bearing the END_STREAM flag is sent.
when :half_closed_local
if sending
- if frame[:type] == :rst_stream
+ case frame[:type]
+ when :rst_stream
event(:local_rst)
+ when :priority
+ process_priority(frame)
else
stream_error
end
else
case frame[:type]
when :data, :headers, :continuation
event(:remote_closed) if end_stream?(frame)
when :rst_stream then event(:remote_rst)
- when :window_update, :priority
- frame[:igore] = true
+ when :priority
+ process_priority(frame)
+ when :window_update
+ frame[:ignore] = true
end
end
# A stream that is "half closed (remote)" is no longer being used by
# the peer to send frames. In this state, an endpoint is no longer
@@ -378,10 +394,12 @@
end
else
case frame[:type]
when :rst_stream then event(:remote_rst)
when :window_update then frame[:ignore] = true
+ when :priority
+ process_priority(frame)
else stream_error(:stream_closed); end
end
# An endpoint MUST NOT send frames on a closed stream. An endpoint
# that receives a frame after receiving a RST_STREAM or a frame
@@ -405,19 +423,25 @@
# be used to close any of those streams.
when :closed
if sending
case frame[:type]
when :rst_stream then # ignore
+ when :priority then
+ process_priority(frame)
else
stream_error(:stream_closed) if !(frame[:type] == :rst_stream)
end
else
- case @closed
- when :remote_rst, :remote_closed
- stream_error(:stream_closed) if !(frame[:type] == :rst_stream)
- when :local_rst, :local_closed
- frame[:ignore] = true
+ if frame[:type] == :priority
+ process_priority(frame)
+ else
+ case @closed
+ when :remote_rst, :remote_closed
+ stream_error(:stream_closed) if !(frame[:type] == :rst_stream)
+ when :local_rst, :local_closed
+ frame[:ignore] = true
+ end
end
end
end
end
@@ -453,10 +477,24 @@
@state = @closed
emit(:half_close)
end
end
+ def process_priority(frame)
+ @weight = frame[:weight]
+ @dependency = frame[:stream_dependency]
+ emit(:priority,
+ weight: frame[:weight],
+ dependency: frame[:stream_dependency],
+ exclusive: frame[:exclusive])
+ # TODO: implement dependency tree housekeeping
+ # Latest draft defines a fairly complex priority control.
+ # See https://tools.ietf.org/html/draft-ietf-httpbis-http2-14#section-5.3
+ # We currently have no prioritization among streams.
+ # We should add code here.
+ end
+
def end_stream?(frame)
case frame[:type]
when :data, :headers, :continuation
frame[:flags].include?(:end_stream)
else false; end
@@ -467,8 +505,7 @@
close(error) if @state != :closed
klass = error.to_s.split('_').map(&:capitalize).join
raise Error.const_get(klass).new(msg)
end
-
end
end