lib/http/2/stream.rb in http-2-0.8.4 vs lib/http/2/stream.rb in http-2-0.9.0
- old
+ new
@@ -69,46 +69,46 @@
# @param weight [Integer]
# @param dependency [Integer]
# @param exclusive [Boolean]
# @param window [Integer]
# @param parent [Stream]
- def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil)
+ # @param state [Symbol]
+ def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle)
@connection = connection
@id = id
@weight = weight
@dependency = dependency
process_priority(weight: weight, stream_dependency: dependency, exclusive: exclusive)
+ @local_window_max_size = connection.local_settings[:settings_initial_window_size]
@local_window = connection.local_settings[:settings_initial_window_size]
@remote_window = connection.remote_settings[:settings_initial_window_size]
@parent = parent
- @state = :idle
+ @state = state
@error = false
@closed = false
@send_buffer = []
on(:window) { |v| @remote_window = v }
- on(:local_window) { |v| @local_window = v }
+ on(:local_window) { |v| @local_window_max_size = @local_window = v }
end
# Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.
#
# @param frame [Hash]
def receive(frame)
transition(frame, false)
case frame[:type]
when :data
- window_size = frame[:payload].bytesize
- window_size += frame[:padding] || 0
- @local_window -= window_size
+ update_local_window(frame)
+ # Emit DATA frame
emit(:data, frame[:payload]) unless frame[:ignore]
-
- # Automatically send WINDOW_UPDATE,
- # assuming that emit(:data) can now receive next data
- window_update(window_size) if window_size > 0
- when :headers, :push_promise
+ calculate_window_update(@local_window_max_size)
+ when :headers
emit(:headers, frame[:payload]) unless frame[:ignore]
+ when :push_promise
+ emit(:promise_headers, frame[:payload]) unless frame[:ignore]
when :priority
process_priority(frame)
when :window_update
process_window_update(frame)
when :altsvc, :blocked
@@ -151,11 +151,11 @@
def headers(headers, end_headers: true, end_stream: false)
flags = []
flags << :end_headers if end_headers
flags << :end_stream if end_stream
- send(type: :headers, flags: flags, payload: headers.to_a)
+ send(type: :headers, flags: flags, payload: headers)
end
def promise(headers, end_headers: true, &block)
fail ArgumentError, 'must provide callback' unless block_given?
@@ -226,12 +226,10 @@
# Sends a WINDOW_UPDATE frame to the peer.
#
# @param increment [Integer]
def window_update(increment)
- # always emit connection-level WINDOW_UPDATE
- emit(:window_update, increment)
# emit stream-level WINDOW_UPDATE unless stream is closed
return if @state == :closed || @state == :remote_closed
send(type: :window_update, increment: increment)
end
@@ -600,9 +598,10 @@
close(error) if @state != :closed
klass = error.to_s.split('_').map(&:capitalize).join
fail Error.const_get(klass), msg
end
+ alias error stream_error
def manage_state(frame)
transition(frame, true)
frame[:stream] ||= @id
yield