lib/http/2/stream.rb in http-2-0.8.4 vs lib/http/2/stream.rb in http-2-0.9.0

- old
+ new

@@ -69,46 +69,46 @@ # @param weight [Integer] # @param dependency [Integer] # @param exclusive [Boolean] # @param window [Integer] # @param parent [Stream] - def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil) + # @param state [Symbol] + def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle) @connection = connection @id = id @weight = weight @dependency = dependency process_priority(weight: weight, stream_dependency: dependency, exclusive: exclusive) + @local_window_max_size = connection.local_settings[:settings_initial_window_size] @local_window = connection.local_settings[:settings_initial_window_size] @remote_window = connection.remote_settings[:settings_initial_window_size] @parent = parent - @state = :idle + @state = state @error = false @closed = false @send_buffer = [] on(:window) { |v| @remote_window = v } - on(:local_window) { |v| @local_window = v } + on(:local_window) { |v| @local_window_max_size = @local_window = v } end # Processes incoming HTTP 2.0 frames. The frames must be decoded upstream. # # @param frame [Hash] def receive(frame) transition(frame, false) case frame[:type] when :data - window_size = frame[:payload].bytesize - window_size += frame[:padding] || 0 - @local_window -= window_size + update_local_window(frame) + # Emit DATA frame emit(:data, frame[:payload]) unless frame[:ignore] - - # Automatically send WINDOW_UPDATE, - # assuming that emit(:data) can now receive next data - window_update(window_size) if window_size > 0 - when :headers, :push_promise + calculate_window_update(@local_window_max_size) + when :headers emit(:headers, frame[:payload]) unless frame[:ignore] + when :push_promise + emit(:promise_headers, frame[:payload]) unless frame[:ignore] when :priority process_priority(frame) when :window_update process_window_update(frame) when :altsvc, :blocked @@ -151,11 +151,11 @@ def headers(headers, end_headers: true, end_stream: false) flags = [] flags << :end_headers if end_headers flags << :end_stream if end_stream - send(type: :headers, flags: flags, payload: headers.to_a) + send(type: :headers, flags: flags, payload: headers) end def promise(headers, end_headers: true, &block) fail ArgumentError, 'must provide callback' unless block_given? @@ -226,12 +226,10 @@ # Sends a WINDOW_UPDATE frame to the peer. # # @param increment [Integer] def window_update(increment) - # always emit connection-level WINDOW_UPDATE - emit(:window_update, increment) # emit stream-level WINDOW_UPDATE unless stream is closed return if @state == :closed || @state == :remote_closed send(type: :window_update, increment: increment) end @@ -600,9 +598,10 @@ close(error) if @state != :closed klass = error.to_s.split('_').map(&:capitalize).join fail Error.const_get(klass), msg end + alias error stream_error def manage_state(frame) transition(frame, true) frame[:stream] ||= @id yield