lib/httpx/connection/http2.rb in httpx-0.13.2 vs lib/httpx/connection/http2.rb in httpx-0.14.0
- old
+ new
@@ -19,27 +19,33 @@
attr_reader :streams, :pending
def initialize(buffer, options)
@options = Options.new(options)
- @max_concurrent_requests = @options.max_concurrent_requests || MAX_CONCURRENT_REQUESTS
- @max_requests = @options.max_requests || 0
+ @settings = @options.http2_settings
@pending = []
@streams = {}
@drains = {}
@pings = []
@buffer = buffer
@handshake_completed = false
+ @wait_for_handshake = @settings.key?(:wait_for_handshake) ? @settings.delete(:wait_for_handshake) : true
+ @max_concurrent_requests = @options.max_concurrent_requests || MAX_CONCURRENT_REQUESTS
+ @max_requests = @options.max_requests || 0
init_connection
end
def interests
# waiting for WINDOW_UPDATE frames
return :r if @buffer.full?
- return :w if @connection.state == :closed
+ if @connection.state == :closed
+ return unless @handshake_completed
+ return :w
+ end
+
unless (@connection.state == :connected && @handshake_completed)
return @buffer.empty? ? :r : :rw
end
return :w if !@pending.empty? && can_buffer_more_requests?
@@ -73,13 +79,17 @@
def <<(data)
@connection << data
end
def can_buffer_more_requests?
- @handshake_completed &&
+ if @handshake_completed
@streams.size < @max_concurrent_requests &&
- @streams.size < @max_requests
+ @streams.size < @max_requests
+ else
+ !@wait_for_handshake &&
+ @streams.size < @max_concurrent_requests
+ end
end
def send(request)
unless can_buffer_more_requests?
@pending << request
@@ -138,16 +148,18 @@
catch(:buffer_full) do
request.transition(:headers)
join_headers(stream, request) if request.state == :headers
request.transition(:body)
join_body(stream, request) if request.state == :body
+ request.transition(:trailers)
+ join_trailers(stream, request) if request.state == :trailers && !request.body.empty?
request.transition(:done)
end
end
def init_connection
- @connection = HTTP2Next::Client.new(@options.http2_settings)
+ @connection = HTTP2Next::Client.new(@settings)
@connection.max_streams = @max_requests if @connection.respond_to?(:max_streams=) && @max_requests.positive?
@connection.on(:frame, &method(:on_frame))
@connection.on(:frame_sent, &method(:on_frame_sent))
@connection.on(:frame_received, &method(:on_frame_received))
@connection.on(:origin, &method(:on_origin))
@@ -167,10 +179,11 @@
alias_method :reset, :init_connection
public :reset
def handle_stream(stream, request)
+ request.on(:refuse, &method(:on_stream_refuse).curry(3)[stream, request])
stream.on(:close, &method(:on_stream_close).curry(3)[stream, request])
stream.on(:half_close) do
log(level: 2) { "#{stream.id}: waiting for response..." }
end
stream.on(:altsvc, &method(:on_altsvc).curry(2)[request.origin])
@@ -191,32 +204,51 @@
request.headers.each.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n")
end
stream.headers(request.headers.each, end_stream: request.empty?)
end
+ def join_trailers(stream, request)
+ unless request.trailers?
+ stream.data("", end_stream: true) if request.callbacks_for?(:trailers)
+ return
+ end
+
+ log(level: 1, color: :yellow) do
+ request.trailers.each.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n")
+ end
+ stream.headers(request.trailers.each, end_stream: true)
+ end
+
def join_body(stream, request)
return if request.empty?
chunk = @drains.delete(request) || request.drain_body
while chunk
next_chunk = request.drain_body
log(level: 1, color: :green) { "#{stream.id}: -> DATA: #{chunk.bytesize} bytes..." }
log(level: 2, color: :green) { "#{stream.id}: -> #{chunk.inspect}" }
- stream.data(chunk, end_stream: !next_chunk)
- if next_chunk && @buffer.full?
+ stream.data(chunk, end_stream: !(next_chunk || request.trailers? || request.callbacks_for?(:trailers)))
+ if next_chunk && (@buffer.full? || request.body.unbounded_body?)
@drains[request] = next_chunk
throw(:buffer_full)
end
chunk = next_chunk
end
+
+ on_stream_refuse(stream, request, request.drain_error) if request.drain_error
end
######
# HTTP/2 Callbacks
######
def on_stream_headers(stream, request, h)
+ if request.response && request.response.version == "2.0"
+ on_stream_trailers(stream, request, h)
+ return
+ end
+
log(color: :yellow) do
h.map { |k, v| "#{stream.id}: <- HEADER: #{k}: #{v}" }.join("\n")
end
_, status = h.shift
headers = request.options.headers_class.new(h)
@@ -225,13 +257,25 @@
@streams[request] = stream
handle(request, stream) if request.expects?
end
+ def on_stream_trailers(stream, request, h)
+ log(color: :yellow) do
+ h.map { |k, v| "#{stream.id}: <- HEADER: #{k}: #{v}" }.join("\n")
+ end
+ request.response.merge_headers(h)
+ end
+
def on_stream_data(stream, request, data)
log(level: 1, color: :green) { "#{stream.id}: <- DATA: #{data.bytesize} bytes..." }
log(level: 2, color: :green) { "#{stream.id}: <- #{data.inspect}" }
request.response << data
+ end
+
+ def on_stream_refuse(stream, request, error)
+ stream.close
+ on_stream_close(stream, request, error)
end
def on_stream_close(stream, request, error)
log(level: 2) { "#{stream.id}: closing stream" }
@drains.delete(request)