lib/httpx/connection/http2.rb in httpx-0.7.0 vs lib/httpx/connection/http2.rb in httpx-0.8.0
- old
+ new
@@ -28,19 +28,43 @@
@buffer = buffer
@handshake_completed = false
init_connection
end
+ def interests
+ # waiting for WINDOW_UPDATE frames
+ return :r if @buffer.full?
+
+ return :w if @connection.state == :closed
+
+ return :r unless (@connection.state == :connected && @handshake_completed)
+
+ return :w unless @pending.empty?
+
+ return :w if @streams.each_key.any? { |r| r.interests == :w }
+
+ return :r if @buffer.empty?
+
+ :rw
+ end
+
+ def reset
+ init_connection
+ end
+
def close
@connection.goaway unless @connection.state == :closed
+ emit(:close)
end
def empty?
@connection.state == :closed || @streams.empty?
end
def exhausted?
+ return false if @max_requests.zero? && @connection.active_stream_count.zero?
+
@connection.active_stream_count >= @max_requests
end
def <<(data)
@connection << data
@@ -66,10 +90,12 @@
emit(:exhausted)
end
def consume
@streams.each do |request, stream|
+ next if request.state == :done
+
handle(request, stream)
end
end
def handle_error(ex)
@@ -126,11 +152,11 @@
end
def handle_stream(stream, request)
stream.on(:close, &method(:on_stream_close).curry[stream, request])
stream.on(:half_close) do
- log(level: 2, label: "#{stream.id}: ") { "waiting for response..." }
+ log(level: 2) { "#{stream.id}: waiting for response..." }
end
stream.on(:altsvc, &method(:on_altsvc).curry[request.origin])
stream.on(:headers, &method(:on_stream_headers).curry[stream, request])
stream.on(:data, &method(:on_stream_data).curry[stream, request])
end
@@ -141,24 +167,24 @@
headers[":scheme"] = request.scheme
headers[":method"] = request.verb.to_s.upcase
headers[":path"] = headline_uri(request)
headers[":authority"] = request.authority
headers = headers.merge(request.headers)
- log(level: 1, label: "#{stream.id}: ", color: :yellow) do
- headers.map { |k, v| "-> HEADER: #{k}: #{v}" }.join("\n")
+ log(level: 1, color: :yellow) do
+ headers.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n")
end
stream.headers(headers, end_stream: request.empty?)
end
def join_body(stream, request)
return if request.empty?
chunk = @drains.delete(request) || request.drain_body
while chunk
next_chunk = request.drain_body
- log(level: 1, label: "#{stream.id}: ", color: :green) { "-> DATA: #{chunk.bytesize} bytes..." }
- log(level: 2, label: "#{stream.id}: ", color: :green) { "-> #{chunk.inspect}" }
+ log(level: 1, color: :green) { "#{stream.id}: -> DATA: #{chunk.bytesize} bytes..." }
+ log(level: 2, color: :green) { "#{stream.id}: -> #{chunk.inspect}" }
stream.data(chunk, end_stream: !next_chunk)
if next_chunk && @buffer.full?
@drains[request] = next_chunk
throw(:buffer_full)
end
@@ -169,29 +195,29 @@
######
# HTTP/2 Callbacks
######
def on_stream_headers(stream, request, h)
- log(label: "#{stream.id}:", color: :yellow) do
- h.map { |k, v| "<- HEADER: #{k}: #{v}" }.join("\n")
+ log(color: :yellow) do
+ h.map { |k, v| "#{stream.id}: <- HEADER: #{k}: #{v}" }.join("\n")
end
_, status = h.shift
headers = request.options.headers_class.new(h)
response = request.options.response_class.new(request, status, "2.0", headers)
request.response = response
@streams[request] = stream
+
+ handle(request, stream) if request.expects?
end
def on_stream_data(stream, request, data)
- log(level: 1, label: "#{stream.id}: ", color: :green) { "<- DATA: #{data.bytesize} bytes..." }
- log(level: 2, label: "#{stream.id}: ", color: :green) { "<- #{data.inspect}" }
+ log(level: 1, color: :green) { "#{stream.id}: <- DATA: #{data.bytesize} bytes..." }
+ log(level: 2, color: :green) { "#{stream.id}: <- #{data.inspect}" }
request.response << data
end
def on_stream_close(stream, request, error)
- return handle(request, stream) if request.expects?
-
if error && error != :no_error
ex = Error.new(stream.id, error)
ex.set_backtrace(caller)
emit(:error, request, ex)
else
@@ -202,30 +228,33 @@
emit(:error, request, ex)
else
emit(:response, request, response)
end
end
- log(level: 2, label: "#{stream.id}: ") { "closing stream" }
+ log(level: 2) { "#{stream.id}: closing stream" }
@streams.delete(request)
send(@pending.shift) unless @pending.empty?
return unless @streams.empty? && exhausted?
close
- emit(:close)
emit(:exhausted) unless @pending.empty?
end
def on_frame(bytes)
@buffer << bytes
end
def on_settings(*)
@handshake_completed = true
- @max_requests = [@max_requests, @connection.remote_settings[:settings_max_concurrent_streams]].max
+ if @max_requests.zero?
+ @max_requests = @connection.remote_settings[:settings_max_concurrent_streams]
+ @connection.max_streams = @max_requests if @connection.respond_to?(:max_streams=) && @max_requests.positive?
+ end
+
@max_concurrent_requests = [@max_concurrent_requests, @max_requests].min
send_pending
end
def on_close(_last_frame, error, _payload)
@@ -240,35 +269,29 @@
emit(:close)
end
def on_frame_sent(frame)
- log(level: 2, label: "#{frame[:stream]}: ") { "frame was sent!" }
- log(level: 2, label: "#{frame[:stream]}: ", color: :blue) do
- case frame[:type]
- when :data
- frame.merge(payload: frame[:payload].bytesize).inspect
- else
- frame.inspect
- end
+ log(level: 2) { "#{frame[:stream]}: frame was sent!" }
+ log(level: 2, color: :blue) do
+ payload = frame
+ payload = payload.merge(payload: frame[:payload].bytesize) if frame[:type] == :data
+ "#{frame[:stream]}: #{payload}"
end
end
def on_frame_received(frame)
- log(level: 2, label: "#{frame[:stream]}: ") { "frame was received!" }
- log(level: 2, label: "#{frame[:stream]}: ", color: :magenta) do
- case frame[:type]
- when :data
- frame.merge(payload: frame[:payload].bytesize).inspect
- else
- frame.inspect
- end
+ log(level: 2) { "#{frame[:stream]}: frame was received!" }
+ log(level: 2, color: :magenta) do
+ payload = frame
+ payload = payload.merge(payload: frame[:payload].bytesize) if frame[:type] == :data
+ "#{frame[:stream]}: #{payload}"
end
end
def on_altsvc(origin, frame)
- log(level: 2, label: "#{frame[:stream]}: ") { "altsvc frame was received" }
- log(level: 2, label: "#{frame[:stream]}: ") { frame.inspect }
+ log(level: 2) { "#{frame[:stream]}: altsvc frame was received" }
+ log(level: 2) { "#{frame[:stream]}: #{frame.inspect}" }
alt_origin = URI.parse("#{frame[:proto]}://#{frame[:host]}:#{frame[:port]}")
params = { "ma" => frame[:max_age] }
emit(:altsvc, origin, alt_origin, origin, params)
end