lib/httpx/connection/http2.rb in httpx-0.13.2 vs lib/httpx/connection/http2.rb in httpx-0.14.0

- old
+ new

@@ -19,27 +19,33 @@ attr_reader :streams, :pending def initialize(buffer, options) @options = Options.new(options) - @max_concurrent_requests = @options.max_concurrent_requests || MAX_CONCURRENT_REQUESTS - @max_requests = @options.max_requests || 0 + @settings = @options.http2_settings @pending = [] @streams = {} @drains = {} @pings = [] @buffer = buffer @handshake_completed = false + @wait_for_handshake = @settings.key?(:wait_for_handshake) ? @settings.delete(:wait_for_handshake) : true + @max_concurrent_requests = @options.max_concurrent_requests || MAX_CONCURRENT_REQUESTS + @max_requests = @options.max_requests || 0 init_connection end def interests # waiting for WINDOW_UPDATE frames return :r if @buffer.full? - return :w if @connection.state == :closed + if @connection.state == :closed + return unless @handshake_completed + return :w + end + unless (@connection.state == :connected && @handshake_completed) return @buffer.empty? ? :r : :rw end return :w if !@pending.empty? && can_buffer_more_requests? @@ -73,13 +79,17 @@ def <<(data) @connection << data end def can_buffer_more_requests? - @handshake_completed && + if @handshake_completed @streams.size < @max_concurrent_requests && - @streams.size < @max_requests + @streams.size < @max_requests + else + !@wait_for_handshake && + @streams.size < @max_concurrent_requests + end end def send(request) unless can_buffer_more_requests? @pending << request @@ -138,16 +148,18 @@ catch(:buffer_full) do request.transition(:headers) join_headers(stream, request) if request.state == :headers request.transition(:body) join_body(stream, request) if request.state == :body + request.transition(:trailers) + join_trailers(stream, request) if request.state == :trailers && !request.body.empty? request.transition(:done) end end def init_connection - @connection = HTTP2Next::Client.new(@options.http2_settings) + @connection = HTTP2Next::Client.new(@settings) @connection.max_streams = @max_requests if @connection.respond_to?(:max_streams=) && @max_requests.positive? @connection.on(:frame, &method(:on_frame)) @connection.on(:frame_sent, &method(:on_frame_sent)) @connection.on(:frame_received, &method(:on_frame_received)) @connection.on(:origin, &method(:on_origin)) @@ -167,10 +179,11 @@ alias_method :reset, :init_connection public :reset def handle_stream(stream, request) + request.on(:refuse, &method(:on_stream_refuse).curry(3)[stream, request]) stream.on(:close, &method(:on_stream_close).curry(3)[stream, request]) stream.on(:half_close) do log(level: 2) { "#{stream.id}: waiting for response..." } end stream.on(:altsvc, &method(:on_altsvc).curry(2)[request.origin]) @@ -191,32 +204,51 @@ request.headers.each.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n") end stream.headers(request.headers.each, end_stream: request.empty?) end + def join_trailers(stream, request) + unless request.trailers? + stream.data("", end_stream: true) if request.callbacks_for?(:trailers) + return + end + + log(level: 1, color: :yellow) do + request.trailers.each.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n") + end + stream.headers(request.trailers.each, end_stream: true) + end + def join_body(stream, request) return if request.empty? chunk = @drains.delete(request) || request.drain_body while chunk next_chunk = request.drain_body log(level: 1, color: :green) { "#{stream.id}: -> DATA: #{chunk.bytesize} bytes..." } log(level: 2, color: :green) { "#{stream.id}: -> #{chunk.inspect}" } - stream.data(chunk, end_stream: !next_chunk) - if next_chunk && @buffer.full? + stream.data(chunk, end_stream: !(next_chunk || request.trailers? || request.callbacks_for?(:trailers))) + if next_chunk && (@buffer.full? || request.body.unbounded_body?) @drains[request] = next_chunk throw(:buffer_full) end chunk = next_chunk end + + on_stream_refuse(stream, request, request.drain_error) if request.drain_error end ###### # HTTP/2 Callbacks ###### def on_stream_headers(stream, request, h) + if request.response && request.response.version == "2.0" + on_stream_trailers(stream, request, h) + return + end + log(color: :yellow) do h.map { |k, v| "#{stream.id}: <- HEADER: #{k}: #{v}" }.join("\n") end _, status = h.shift headers = request.options.headers_class.new(h) @@ -225,13 +257,25 @@ @streams[request] = stream handle(request, stream) if request.expects? end + def on_stream_trailers(stream, request, h) + log(color: :yellow) do + h.map { |k, v| "#{stream.id}: <- HEADER: #{k}: #{v}" }.join("\n") + end + request.response.merge_headers(h) + end + def on_stream_data(stream, request, data) log(level: 1, color: :green) { "#{stream.id}: <- DATA: #{data.bytesize} bytes..." } log(level: 2, color: :green) { "#{stream.id}: <- #{data.inspect}" } request.response << data + end + + def on_stream_refuse(stream, request, error) + stream.close + on_stream_close(stream, request, error) end def on_stream_close(stream, request, error) log(level: 2) { "#{stream.id}: closing stream" } @drains.delete(request)