lib/httpx/connection/http2.rb in httpx-0.7.0 vs lib/httpx/connection/http2.rb in httpx-0.8.0

- old
+ new

@@ -28,19 +28,43 @@ @buffer = buffer @handshake_completed = false init_connection end + def interests + # waiting for WINDOW_UPDATE frames + return :r if @buffer.full? + + return :w if @connection.state == :closed + + return :r unless (@connection.state == :connected && @handshake_completed) + + return :w unless @pending.empty? + + return :w if @streams.each_key.any? { |r| r.interests == :w } + + return :r if @buffer.empty? + + :rw + end + + def reset + init_connection + end + def close @connection.goaway unless @connection.state == :closed + emit(:close) end def empty? @connection.state == :closed || @streams.empty? end def exhausted? + return false if @max_requests.zero? && @connection.active_stream_count.zero? + @connection.active_stream_count >= @max_requests end def <<(data) @connection << data @@ -66,10 +90,12 @@ emit(:exhausted) end def consume @streams.each do |request, stream| + next if request.state == :done + handle(request, stream) end end def handle_error(ex) @@ -126,11 +152,11 @@ end def handle_stream(stream, request) stream.on(:close, &method(:on_stream_close).curry[stream, request]) stream.on(:half_close) do - log(level: 2, label: "#{stream.id}: ") { "waiting for response..." } + log(level: 2) { "#{stream.id}: waiting for response..." } end stream.on(:altsvc, &method(:on_altsvc).curry[request.origin]) stream.on(:headers, &method(:on_stream_headers).curry[stream, request]) stream.on(:data, &method(:on_stream_data).curry[stream, request]) end @@ -141,24 +167,24 @@ headers[":scheme"] = request.scheme headers[":method"] = request.verb.to_s.upcase headers[":path"] = headline_uri(request) headers[":authority"] = request.authority headers = headers.merge(request.headers) - log(level: 1, label: "#{stream.id}: ", color: :yellow) do - headers.map { |k, v| "-> HEADER: #{k}: #{v}" }.join("\n") + log(level: 1, color: :yellow) do + headers.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n") end stream.headers(headers, end_stream: request.empty?) end def join_body(stream, request) return if request.empty? chunk = @drains.delete(request) || request.drain_body while chunk next_chunk = request.drain_body - log(level: 1, label: "#{stream.id}: ", color: :green) { "-> DATA: #{chunk.bytesize} bytes..." } - log(level: 2, label: "#{stream.id}: ", color: :green) { "-> #{chunk.inspect}" } + log(level: 1, color: :green) { "#{stream.id}: -> DATA: #{chunk.bytesize} bytes..." } + log(level: 2, color: :green) { "#{stream.id}: -> #{chunk.inspect}" } stream.data(chunk, end_stream: !next_chunk) if next_chunk && @buffer.full? @drains[request] = next_chunk throw(:buffer_full) end @@ -169,29 +195,29 @@ ###### # HTTP/2 Callbacks ###### def on_stream_headers(stream, request, h) - log(label: "#{stream.id}:", color: :yellow) do - h.map { |k, v| "<- HEADER: #{k}: #{v}" }.join("\n") + log(color: :yellow) do + h.map { |k, v| "#{stream.id}: <- HEADER: #{k}: #{v}" }.join("\n") end _, status = h.shift headers = request.options.headers_class.new(h) response = request.options.response_class.new(request, status, "2.0", headers) request.response = response @streams[request] = stream + + handle(request, stream) if request.expects? end def on_stream_data(stream, request, data) - log(level: 1, label: "#{stream.id}: ", color: :green) { "<- DATA: #{data.bytesize} bytes..." } - log(level: 2, label: "#{stream.id}: ", color: :green) { "<- #{data.inspect}" } + log(level: 1, color: :green) { "#{stream.id}: <- DATA: #{data.bytesize} bytes..." } + log(level: 2, color: :green) { "#{stream.id}: <- #{data.inspect}" } request.response << data end def on_stream_close(stream, request, error) - return handle(request, stream) if request.expects? - if error && error != :no_error ex = Error.new(stream.id, error) ex.set_backtrace(caller) emit(:error, request, ex) else @@ -202,30 +228,33 @@ emit(:error, request, ex) else emit(:response, request, response) end end - log(level: 2, label: "#{stream.id}: ") { "closing stream" } + log(level: 2) { "#{stream.id}: closing stream" } @streams.delete(request) send(@pending.shift) unless @pending.empty? return unless @streams.empty? && exhausted? close - emit(:close) emit(:exhausted) unless @pending.empty? end def on_frame(bytes) @buffer << bytes end def on_settings(*) @handshake_completed = true - @max_requests = [@max_requests, @connection.remote_settings[:settings_max_concurrent_streams]].max + if @max_requests.zero? + @max_requests = @connection.remote_settings[:settings_max_concurrent_streams] + @connection.max_streams = @max_requests if @connection.respond_to?(:max_streams=) && @max_requests.positive? + end + @max_concurrent_requests = [@max_concurrent_requests, @max_requests].min send_pending end def on_close(_last_frame, error, _payload) @@ -240,35 +269,29 @@ emit(:close) end def on_frame_sent(frame) - log(level: 2, label: "#{frame[:stream]}: ") { "frame was sent!" } - log(level: 2, label: "#{frame[:stream]}: ", color: :blue) do - case frame[:type] - when :data - frame.merge(payload: frame[:payload].bytesize).inspect - else - frame.inspect - end + log(level: 2) { "#{frame[:stream]}: frame was sent!" } + log(level: 2, color: :blue) do + payload = frame + payload = payload.merge(payload: frame[:payload].bytesize) if frame[:type] == :data + "#{frame[:stream]}: #{payload}" end end def on_frame_received(frame) - log(level: 2, label: "#{frame[:stream]}: ") { "frame was received!" } - log(level: 2, label: "#{frame[:stream]}: ", color: :magenta) do - case frame[:type] - when :data - frame.merge(payload: frame[:payload].bytesize).inspect - else - frame.inspect - end + log(level: 2) { "#{frame[:stream]}: frame was received!" } + log(level: 2, color: :magenta) do + payload = frame + payload = payload.merge(payload: frame[:payload].bytesize) if frame[:type] == :data + "#{frame[:stream]}: #{payload}" end end def on_altsvc(origin, frame) - log(level: 2, label: "#{frame[:stream]}: ") { "altsvc frame was received" } - log(level: 2, label: "#{frame[:stream]}: ") { frame.inspect } + log(level: 2) { "#{frame[:stream]}: altsvc frame was received" } + log(level: 2) { "#{frame[:stream]}: #{frame.inspect}" } alt_origin = URI.parse("#{frame[:proto]}://#{frame[:host]}:#{frame[:port]}") params = { "ma" => frame[:max_age] } emit(:altsvc, origin, alt_origin, origin, params) end