lib/httpx/connection/http2.rb in httpx-0.6.7 vs lib/httpx/connection/http2.rb in httpx-0.7.0

- old
+ new

@@ -6,54 +6,66 @@ module HTTPX class Connection::HTTP2 include Callbacks include Loggable + MAX_CONCURRENT_REQUESTS = HTTP2Next::DEFAULT_MAX_CONCURRENT_STREAMS + Error = Class.new(Error) do def initialize(id, code) super("stream #{id} closed with error: #{code}") end end attr_reader :streams, :pending def initialize(buffer, options) @options = Options.new(options) - @max_concurrent_requests = @options.max_concurrent_requests + @max_concurrent_requests = @options.max_concurrent_requests || MAX_CONCURRENT_REQUESTS + @max_requests = @options.max_requests || 0 @pending = [] @streams = {} @drains = {} @buffer = buffer @handshake_completed = false init_connection end def close - @connection.goaway + @connection.goaway unless @connection.state == :closed end def empty? @connection.state == :closed || @streams.empty? end + def exhausted? + @connection.active_stream_count >= @max_requests + end + def <<(data) @connection << data end - def send(request, **) + def send(request) if !@handshake_completed || - @streams.size >= @max_concurrent_requests + @streams.size >= @max_concurrent_requests || + @streams.size >= @max_requests @pending << request return end unless (stream = @streams[request]) stream = @connection.new_stream handle_stream(stream, request) @streams[request] = stream + @max_requests -= 1 end handle(request, stream) true + rescue HTTP2Next::Error::StreamLimitExceeded + @pending.unshift(request) + emit(:exhausted) end def consume @streams.each do |request, stream| handle(request, stream) @@ -93,10 +105,11 @@ end end def init_connection @connection = HTTP2Next::Client.new(@options.http2_settings) + @connection.max_streams = @max_requests if @connection.respond_to?(:max_streams=) && @max_requests.positive? @connection.on(:frame, &method(:on_frame)) @connection.on(:frame_sent, &method(:on_frame_sent)) @connection.on(:frame_received, &method(:on_frame_received)) @connection.on(:origin, &method(:on_origin)) @connection.on(:promise, &method(:on_promise)) @@ -193,19 +206,26 @@ end log(level: 2, label: "#{stream.id}: ") { "closing stream" } @streams.delete(request) send(@pending.shift) unless @pending.empty? + return unless @streams.empty? && exhausted? + + close + emit(:close) + emit(:exhausted) unless @pending.empty? end def on_frame(bytes) @buffer << bytes end def on_settings(*) @handshake_completed = true - @max_concurrent_requests = [@max_concurrent_requests, - @connection.remote_settings[:settings_max_concurrent_streams]].min + + @max_requests = [@max_requests, @connection.remote_settings[:settings_max_concurrent_streams]].max + + @max_concurrent_requests = [@max_concurrent_requests, @max_requests].min send_pending end def on_close(_last_frame, error, _payload) if error && error != :no_error