lib/httpx/connection/http2.rb in httpx-0.6.7 vs lib/httpx/connection/http2.rb in httpx-0.7.0
- old
+ new
@@ -6,54 +6,66 @@
module HTTPX
class Connection::HTTP2
include Callbacks
include Loggable
+ MAX_CONCURRENT_REQUESTS = HTTP2Next::DEFAULT_MAX_CONCURRENT_STREAMS
+
Error = Class.new(Error) do
def initialize(id, code)
super("stream #{id} closed with error: #{code}")
end
end
attr_reader :streams, :pending
def initialize(buffer, options)
@options = Options.new(options)
- @max_concurrent_requests = @options.max_concurrent_requests
+ @max_concurrent_requests = @options.max_concurrent_requests || MAX_CONCURRENT_REQUESTS
+ @max_requests = @options.max_requests || 0
@pending = []
@streams = {}
@drains = {}
@buffer = buffer
@handshake_completed = false
init_connection
end
def close
- @connection.goaway
+ @connection.goaway unless @connection.state == :closed
end
def empty?
@connection.state == :closed || @streams.empty?
end
+ def exhausted?
+ @connection.active_stream_count >= @max_requests
+ end
+
def <<(data)
@connection << data
end
- def send(request, **)
+ def send(request)
if !@handshake_completed ||
- @streams.size >= @max_concurrent_requests
+ @streams.size >= @max_concurrent_requests ||
+ @streams.size >= @max_requests
@pending << request
return
end
unless (stream = @streams[request])
stream = @connection.new_stream
handle_stream(stream, request)
@streams[request] = stream
+ @max_requests -= 1
end
handle(request, stream)
true
+ rescue HTTP2Next::Error::StreamLimitExceeded
+ @pending.unshift(request)
+ emit(:exhausted)
end
def consume
@streams.each do |request, stream|
handle(request, stream)
@@ -93,10 +105,11 @@
end
end
def init_connection
@connection = HTTP2Next::Client.new(@options.http2_settings)
+ @connection.max_streams = @max_requests if @connection.respond_to?(:max_streams=) && @max_requests.positive?
@connection.on(:frame, &method(:on_frame))
@connection.on(:frame_sent, &method(:on_frame_sent))
@connection.on(:frame_received, &method(:on_frame_received))
@connection.on(:origin, &method(:on_origin))
@connection.on(:promise, &method(:on_promise))
@@ -193,19 +206,26 @@
end
log(level: 2, label: "#{stream.id}: ") { "closing stream" }
@streams.delete(request)
send(@pending.shift) unless @pending.empty?
+ return unless @streams.empty? && exhausted?
+
+ close
+ emit(:close)
+ emit(:exhausted) unless @pending.empty?
end
def on_frame(bytes)
@buffer << bytes
end
def on_settings(*)
@handshake_completed = true
- @max_concurrent_requests = [@max_concurrent_requests,
- @connection.remote_settings[:settings_max_concurrent_streams]].min
+
+ @max_requests = [@max_requests, @connection.remote_settings[:settings_max_concurrent_streams]].max
+
+ @max_concurrent_requests = [@max_concurrent_requests, @max_requests].min
send_pending
end
def on_close(_last_frame, error, _payload)
if error && error != :no_error