lib/httpx/connection/http2.rb in httpx-0.11.3 vs lib/httpx/connection/http2.rb in httpx-0.12.0
- old
+ new
@@ -40,16 +40,20 @@
unless (@connection.state == :connected && @handshake_completed)
return @buffer.empty? ? :r : :rw
end
- return :w unless @pending.empty?
+ return :w if !@pending.empty? && can_buffer_more_requests?
return :w if @streams.each_key.any? { |r| r.interests == :w }
- return :r if @buffer.empty?
+ if @buffer.empty?
+ return if @streams.empty? && @pings.empty?
+ return :r
+ end
+
:rw
end
def close
@connection.goaway unless @connection.state == :closed
@@ -68,23 +72,28 @@
def <<(data)
@connection << data
end
+ def can_buffer_more_requests?
+ @handshake_completed &&
+ @streams.size < @max_concurrent_requests &&
+ @streams.size < @max_requests
+ end
+
def send(request)
- if !@handshake_completed ||
- @streams.size >= @max_concurrent_requests ||
- @streams.size >= @max_requests
+ unless can_buffer_more_requests?
@pending << request
return
end
unless (stream = @streams[request])
stream = @connection.new_stream
handle_stream(stream, request)
@streams[request] = stream
@max_requests -= 1
end
+ request.once(:headers, &method(:set_protocol_headers))
handle(request, stream)
true
rescue HTTP2Next::Error::StreamLimitExceeded
@pending.unshift(request)
emit(:exhausted)
@@ -124,12 +133,10 @@
def headline_uri(request)
request.path
end
- def set_request_headers(request); end
-
def handle(request, stream)
catch(:buffer_full) do
request.transition(:headers)
join_headers(stream, request) if request.state == :headers
request.transition(:body)
@@ -170,22 +177,22 @@
stream.on(:altsvc, &method(:on_altsvc).curry(2)[request.origin])
stream.on(:headers, &method(:on_stream_headers).curry(3)[stream, request])
stream.on(:data, &method(:on_stream_data).curry(3)[stream, request])
end
+ def set_protocol_headers(request)
+ request.headers[":scheme"] = request.scheme
+ request.headers[":method"] = request.verb.to_s.upcase
+ request.headers[":path"] = headline_uri(request)
+ request.headers[":authority"] = request.authority
+ end
+
def join_headers(stream, request)
- set_request_headers(request)
- headers = {}
- headers[":scheme"] = request.scheme
- headers[":method"] = request.verb.to_s.upcase
- headers[":path"] = headline_uri(request)
- headers[":authority"] = request.authority
- headers = headers.merge(request.headers)
log(level: 1, color: :yellow) do
- headers.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n")
+ request.headers.each.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n")
end
- stream.headers(headers, end_stream: request.empty?)
+ stream.headers(request.headers.each, end_stream: request.empty?)
end
def join_body(stream, request)
return if request.empty?
@@ -225,27 +232,29 @@
log(level: 2, color: :green) { "#{stream.id}: <- #{data.inspect}" }
request.response << data
end
def on_stream_close(stream, request, error)
+ log(level: 2) { "#{stream.id}: closing stream" }
+ @drains.delete(request)
+ @streams.delete(request)
+
if error && error != :no_error
ex = Error.new(stream.id, error)
ex.set_backtrace(caller)
- emit(:error, request, ex)
+ response = ErrorResponse.new(request, ex, request.options)
+ emit(:response, request, response)
else
response = request.response
if response.status == 421
ex = MisdirectedRequestError.new(response)
ex.set_backtrace(caller)
emit(:error, request, ex)
else
emit(:response, request, response)
end
end
- log(level: 2) { "#{stream.id}: closing stream" }
-
- @streams.delete(request)
send(@pending.shift) unless @pending.empty?
return unless @streams.empty? && exhausted?
close
emit(:exhausted) unless @pending.empty?
@@ -326,14 +335,12 @@
def respond_to_missing?(meth, *args)
@connection.respond_to?(meth, *args) || super
end
def method_missing(meth, *args, &blk)
- if @connection.respond_to?(meth)
- @connection.__send__(meth, *args, &blk)
- else
- super
- end
+ return super unless @connection.respond_to?(meth)
+
+ @connection.__send__(meth, *args, &blk)
end
end
Connection.register "h2", Connection::HTTP2
end