lib/httpx/channel/http2.rb in httpx-0.2.1 vs lib/httpx/channel/http2.rb in httpx-0.3.0

- old
+ new

@@ -1,14 +1,31 @@ # frozen_string_literal: true +require "io/wait" require "http/2" module HTTPX class Channel::HTTP2 include Callbacks include Loggable + if HTTP2::VERSION < "0.10.1" + module HTTP2Extensions + refine ::HTTP2::Client do + def receive(*) + send_connection_preface + super + end + + def <<(*args) + receive(*args) + end + end + end + using HTTP2Extensions + end + Error = Class.new(Error) do def initialize(id, code) super("stream #{id} closed with error: #{code}") end end @@ -16,15 +33,16 @@ attr_reader :streams, :pending def initialize(buffer, options) @options = Options.new(options) @max_concurrent_requests = @options.max_concurrent_requests - init_connection @pending = [] @streams = {} @drains = {} @buffer = buffer + @handshake_completed = false + init_connection end def close @connection.goaway end @@ -36,20 +54,22 @@ def <<(data) @connection << data end def send(request, **) - if @connection.active_stream_count >= @max_concurrent_requests + if !@handshake_completed || + @connection.active_stream_count >= @max_concurrent_requests @pending << request return end unless (stream = @streams[request]) stream = @connection.new_stream handle_stream(stream, request) @streams[request] = stream end handle(request, stream) + true end def reenqueue! requests = @streams.keys @streams.clear @@ -71,10 +91,16 @@ end end private + def send_pending + while (request = @pending.shift) + break unless send(request) + end + end + def headline_uri(request) request.path end def set_request_headers(request); end @@ -93,21 +119,28 @@ @connection = HTTP2::Client.new(@options.http2_settings) @connection.on(:frame, &method(:on_frame)) @connection.on(:frame_sent, &method(:on_frame_sent)) @connection.on(:frame_received, &method(:on_frame_received)) @connection.on(:promise, &method(:on_promise)) - @connection.on(:altsvc, &method(:on_altsvc)) + @connection.on(:altsvc) { |frame| on_altsvc(frame[:origin], frame) } @connection.on(:settings_ack, &method(:on_settings)) @connection.on(:goaway, &method(:on_close)) + # + # Some servers initiate HTTP/2 negotiation right away, some don't. + # As such, we have to check the socket buffer. If there is something + # to read, the server initiated the negotiation. If not, we have to + # initiate it. + # + @connection.send_connection_preface end def handle_stream(stream, request) stream.on(:close, &method(:on_stream_close).curry[stream, request]) stream.on(:half_close) do log(level: 2, label: "#{stream.id}: ") { "waiting for response..." } end - # stream.on(:altsvc) + stream.on(:altsvc, &method(:on_altsvc).curry[request.origin]) stream.on(:headers, &method(:on_stream_headers).curry[stream, request]) stream.on(:data, &method(:on_stream_data).curry[stream, request]) end def join_headers(stream, request) @@ -185,12 +218,14 @@ def on_frame(bytes) @buffer << bytes end def on_settings(*) + @handshake_completed = true @max_concurrent_requests = [@max_concurrent_requests, @connection.remote_settings[:settings_max_concurrent_streams]].min + send_pending end def on_close(_last_frame, error, _payload) if error ex = Error.new(0, error) @@ -223,12 +258,15 @@ frame.inspect end end end - def on_altsvc(frame) + def on_altsvc(origin, frame) log(level: 2, label: "#{frame[:stream]}: ") { "altsvc frame was received" } log(level: 2, label: "#{frame[:stream]}: ") { frame.inspect } + alt_origin = URI.parse("#{frame[:proto]}://#{frame[:host]}:#{frame[:port]}") + params = { "ma" => frame[:max_age] } + emit(:altsvc, origin, alt_origin, origin, params) end def on_promise(stream) emit(:promise, self, stream) end