lib/http/2/stream.rb in http-2-0.6.3 vs lib/http/2/stream.rb in http-2-0.7.0

- old
+ new

@@ -22,11 +22,11 @@ # | +----------+ +--------+ +----------+ | # | | :active | | :active | | # | | ,-------|:active |-------. | | # | | H / ES | | ES \ H | | # | v v +--------+ v v | - # | +-----------+ | +-_---------+ | + # | +-----------+ | +-----------+ | # | |:half_close| | |:half_close| | # | | (remote) | | | (local) | | # | +-----------+ | +-----------+ | # | | v | | # | | ES/R +--------+ ES/R | | @@ -47,14 +47,17 @@ # Request parent stream of push stream. attr_reader :parent # Stream priority as set by initiator. - attr_reader :priority + attr_reader :weight + attr_reader :dependency # Size of current stream flow control window. - attr_reader :window + attr_reader :local_window + attr_reader :remote_window + alias :window :local_window # Reason why connection was closed. attr_reader :closed # Initializes new stream. @@ -62,47 +65,51 @@ # Note that you should never have to call this directly. To create a new # client initiated stream, use Connection#new_stream. Similarly, Connection # will emit new stream objects, when new stream frames are received. # # @param id [Integer] - # @param priority [Integer] + # @param weight [Integer] + # @param dependency [Integer] + # @param exclusive [Boolean] # @param window [Integer] # @param parent [Stream] - def initialize(id, priority, window, parent = nil) - @id = id - @priority = priority - @window = window + def initialize(connection: nil, id: nil, weight: 16, dependency: 0, exclusive: false, parent: nil) + @connection = connection or raise ArgumentError.new("missing mandatory argument connection") + @id = id or raise ArgumentError.new("missing mandatory argument id") + @weight = weight + @dependency = dependency + process_priority({weight: weight, stream_dependency: dependency, exclusive: exclusive}) + @remote_window = connection.remote_settings[:settings_initial_window_size] @parent = parent @state = :idle @error = false @closed = false @send_buffer = [] - on(:window) { |v| @window = v } + on(:window) { |v| @remote_window = v } + on(:local_window) { |v| @local_window = v } end # Processes incoming HTTP 2.0 frames. The frames must be decoded upstream. # # @param frame [Hash] def receive(frame) transition(frame, false) case frame[:type] when :data + # TODO: when receiving DATA, keep track of local_window. emit(:data, frame[:payload]) if !frame[:ignore] when :headers, :push_promise - if frame[:payload].is_a? Array - emit(:headers, Hash[*frame[:payload].flatten]) if !frame[:ignore] - else - emit(:headers, frame[:payload]) if !frame[:ignore] - end + emit(:headers, frame[:payload]) if !frame[:ignore] when :priority - @priority = frame[:priority] - emit(:priority, @priority) + process_priority(frame) when :window_update - @window += frame[:increment] + @remote_window += frame[:increment] send_data + when :altsvc, :blocked + emit(frame[:type], frame) end complete_transition(frame) end alias :<< :receive @@ -114,11 +121,11 @@ # @param frame [Hash] def send(frame) transition(frame, true) frame[:stream] ||= @id - @priority = frame[:priority] if frame[:type] == :priority + process_priority(frame) if frame[:type] == :priority if frame[:type] == :data send_data(frame) else emit(:frame, frame) @@ -127,47 +134,51 @@ complete_transition(frame) end # Sends a HEADERS frame containing HTTP response headers. # - # @param headers [Hash] + # @param headers [Array or Hash] Array of key-value pairs or Hash # @param end_headers [Boolean] indicates that no more headers will be sent # @param end_stream [Boolean] indicates that no payload will be sent def headers(headers, end_headers: true, end_stream: false) flags = [] flags << :end_headers if end_headers flags << :end_stream if end_stream send({type: :headers, flags: flags, payload: headers.to_a}) end - def promise(headers, end_push_promise: true, &block) + def promise(headers, end_headers: true, &block) raise Exception.new("must provide callback") if !block_given? - flags = end_push_promise ? [:end_push_promise] : [] + flags = end_headers ? [:end_headers] : [] emit(:promise, self, headers, flags, &block) end # Sends a PRIORITY frame with new stream priority value (can only be # performed by the client). # - # @param p [Integer] new stream priority value - def reprioritize(p) + # @param weight [Integer] new stream weight value + # @param dependency [Integer] new stream dependency stream + def reprioritize(weight: 16, dependency: 0, exclusive: false) stream_error if @id.even? - send({type: :priority, priority: p}) + send({type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive}) end # Sends DATA frame containing response payload. # # @param payload [String] # @param end_stream [Boolean] indicates last response DATA frame def data(payload, end_stream: true) flags = [] flags << :end_stream if end_stream - while payload.bytesize > MAX_FRAME_SIZE do - chunk = payload.slice!(0, MAX_FRAME_SIZE) + # Split data according to each frame is smaller enough + # TODO: consider padding? + max_size = @connection.remote_settings[:settings_max_frame_size] + while payload.bytesize > max_size do + chunk = payload.slice!(0, max_size) send({type: :data, payload: chunk}) end send({type: :data, flags: flags, payload: payload}) end @@ -342,22 +353,27 @@ # A receiver can ignore WINDOW_UPDATE or PRIORITY frames in this # state. These frame types might arrive for a short period after a # frame bearing the END_STREAM flag is sent. when :half_closed_local if sending - if frame[:type] == :rst_stream + case frame[:type] + when :rst_stream event(:local_rst) + when :priority + process_priority(frame) else stream_error end else case frame[:type] when :data, :headers, :continuation event(:remote_closed) if end_stream?(frame) when :rst_stream then event(:remote_rst) - when :window_update, :priority - frame[:igore] = true + when :priority + process_priority(frame) + when :window_update + frame[:ignore] = true end end # A stream that is "half closed (remote)" is no longer being used by # the peer to send frames. In this state, an endpoint is no longer @@ -378,10 +394,12 @@ end else case frame[:type] when :rst_stream then event(:remote_rst) when :window_update then frame[:ignore] = true + when :priority + process_priority(frame) else stream_error(:stream_closed); end end # An endpoint MUST NOT send frames on a closed stream. An endpoint # that receives a frame after receiving a RST_STREAM or a frame @@ -405,19 +423,25 @@ # be used to close any of those streams. when :closed if sending case frame[:type] when :rst_stream then # ignore + when :priority then + process_priority(frame) else stream_error(:stream_closed) if !(frame[:type] == :rst_stream) end else - case @closed - when :remote_rst, :remote_closed - stream_error(:stream_closed) if !(frame[:type] == :rst_stream) - when :local_rst, :local_closed - frame[:ignore] = true + if frame[:type] == :priority + process_priority(frame) + else + case @closed + when :remote_rst, :remote_closed + stream_error(:stream_closed) if !(frame[:type] == :rst_stream) + when :local_rst, :local_closed + frame[:ignore] = true + end end end end end @@ -453,10 +477,24 @@ @state = @closed emit(:half_close) end end + def process_priority(frame) + @weight = frame[:weight] + @dependency = frame[:stream_dependency] + emit(:priority, + weight: frame[:weight], + dependency: frame[:stream_dependency], + exclusive: frame[:exclusive]) + # TODO: implement dependency tree housekeeping + # Latest draft defines a fairly complex priority control. + # See https://tools.ietf.org/html/draft-ietf-httpbis-http2-14#section-5.3 + # We currently have no prioritization among streams. + # We should add code here. + end + def end_stream?(frame) case frame[:type] when :data, :headers, :continuation frame[:flags].include?(:end_stream) else false; end @@ -467,8 +505,7 @@ close(error) if @state != :closed klass = error.to_s.split('_').map(&:capitalize).join raise Error.const_get(klass).new(msg) end - end end