lib/httpx/connection/http2.rb in httpx-0.11.3 vs lib/httpx/connection/http2.rb in httpx-0.12.0

- old
+ new

@@ -40,16 +40,20 @@ unless (@connection.state == :connected && @handshake_completed) return @buffer.empty? ? :r : :rw end - return :w unless @pending.empty? + return :w if !@pending.empty? && can_buffer_more_requests? return :w if @streams.each_key.any? { |r| r.interests == :w } - return :r if @buffer.empty? + if @buffer.empty? + return if @streams.empty? && @pings.empty? + return :r + end + :rw end def close @connection.goaway unless @connection.state == :closed @@ -68,23 +72,28 @@ def <<(data) @connection << data end + def can_buffer_more_requests? + @handshake_completed && + @streams.size < @max_concurrent_requests && + @streams.size < @max_requests + end + def send(request) - if !@handshake_completed || - @streams.size >= @max_concurrent_requests || - @streams.size >= @max_requests + unless can_buffer_more_requests? @pending << request return end unless (stream = @streams[request]) stream = @connection.new_stream handle_stream(stream, request) @streams[request] = stream @max_requests -= 1 end + request.once(:headers, &method(:set_protocol_headers)) handle(request, stream) true rescue HTTP2Next::Error::StreamLimitExceeded @pending.unshift(request) emit(:exhausted) @@ -124,12 +133,10 @@ def headline_uri(request) request.path end - def set_request_headers(request); end - def handle(request, stream) catch(:buffer_full) do request.transition(:headers) join_headers(stream, request) if request.state == :headers request.transition(:body) @@ -170,22 +177,22 @@ stream.on(:altsvc, &method(:on_altsvc).curry(2)[request.origin]) stream.on(:headers, &method(:on_stream_headers).curry(3)[stream, request]) stream.on(:data, &method(:on_stream_data).curry(3)[stream, request]) end + def set_protocol_headers(request) + request.headers[":scheme"] = request.scheme + request.headers[":method"] = request.verb.to_s.upcase + request.headers[":path"] = headline_uri(request) + request.headers[":authority"] = request.authority + end + def join_headers(stream, request) - set_request_headers(request) - headers = {} - headers[":scheme"] = request.scheme - headers[":method"] = request.verb.to_s.upcase - headers[":path"] = headline_uri(request) - headers[":authority"] = request.authority - headers = headers.merge(request.headers) log(level: 1, color: :yellow) do - headers.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n") + request.headers.each.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n") end - stream.headers(headers, end_stream: request.empty?) + stream.headers(request.headers.each, end_stream: request.empty?) end def join_body(stream, request) return if request.empty? @@ -225,27 +232,29 @@ log(level: 2, color: :green) { "#{stream.id}: <- #{data.inspect}" } request.response << data end def on_stream_close(stream, request, error) + log(level: 2) { "#{stream.id}: closing stream" } + @drains.delete(request) + @streams.delete(request) + if error && error != :no_error ex = Error.new(stream.id, error) ex.set_backtrace(caller) - emit(:error, request, ex) + response = ErrorResponse.new(request, ex, request.options) + emit(:response, request, response) else response = request.response if response.status == 421 ex = MisdirectedRequestError.new(response) ex.set_backtrace(caller) emit(:error, request, ex) else emit(:response, request, response) end end - log(level: 2) { "#{stream.id}: closing stream" } - - @streams.delete(request) send(@pending.shift) unless @pending.empty? return unless @streams.empty? && exhausted? close emit(:exhausted) unless @pending.empty? @@ -326,14 +335,12 @@ def respond_to_missing?(meth, *args) @connection.respond_to?(meth, *args) || super end def method_missing(meth, *args, &blk) - if @connection.respond_to?(meth) - @connection.__send__(meth, *args, &blk) - else - super - end + return super unless @connection.respond_to?(meth) + + @connection.__send__(meth, *args, &blk) end end Connection.register "h2", Connection::HTTP2 end