lib/httpx/channel/http2.rb in httpx-0.2.1 vs lib/httpx/channel/http2.rb in httpx-0.3.0
- old
+ new
@@ -1,14 +1,31 @@
# frozen_string_literal: true
+require "io/wait"
require "http/2"
module HTTPX
class Channel::HTTP2
include Callbacks
include Loggable
+ if HTTP2::VERSION < "0.10.1"
+ module HTTP2Extensions
+ refine ::HTTP2::Client do
+ def receive(*)
+ send_connection_preface
+ super
+ end
+
+ def <<(*args)
+ receive(*args)
+ end
+ end
+ end
+ using HTTP2Extensions
+ end
+
Error = Class.new(Error) do
def initialize(id, code)
super("stream #{id} closed with error: #{code}")
end
end
@@ -16,15 +33,16 @@
attr_reader :streams, :pending
def initialize(buffer, options)
@options = Options.new(options)
@max_concurrent_requests = @options.max_concurrent_requests
- init_connection
@pending = []
@streams = {}
@drains = {}
@buffer = buffer
+ @handshake_completed = false
+ init_connection
end
def close
@connection.goaway
end
@@ -36,20 +54,22 @@
def <<(data)
@connection << data
end
def send(request, **)
- if @connection.active_stream_count >= @max_concurrent_requests
+ if !@handshake_completed ||
+ @connection.active_stream_count >= @max_concurrent_requests
@pending << request
return
end
unless (stream = @streams[request])
stream = @connection.new_stream
handle_stream(stream, request)
@streams[request] = stream
end
handle(request, stream)
+ true
end
def reenqueue!
requests = @streams.keys
@streams.clear
@@ -71,10 +91,16 @@
end
end
private
+ def send_pending
+ while (request = @pending.shift)
+ break unless send(request)
+ end
+ end
+
def headline_uri(request)
request.path
end
def set_request_headers(request); end
@@ -93,21 +119,28 @@
@connection = HTTP2::Client.new(@options.http2_settings)
@connection.on(:frame, &method(:on_frame))
@connection.on(:frame_sent, &method(:on_frame_sent))
@connection.on(:frame_received, &method(:on_frame_received))
@connection.on(:promise, &method(:on_promise))
- @connection.on(:altsvc, &method(:on_altsvc))
+ @connection.on(:altsvc) { |frame| on_altsvc(frame[:origin], frame) }
@connection.on(:settings_ack, &method(:on_settings))
@connection.on(:goaway, &method(:on_close))
+ #
+ # Some servers initiate HTTP/2 negotiation right away, some don't.
+ # As such, we have to check the socket buffer. If there is something
+ # to read, the server initiated the negotiation. If not, we have to
+ # initiate it.
+ #
+ @connection.send_connection_preface
end
def handle_stream(stream, request)
stream.on(:close, &method(:on_stream_close).curry[stream, request])
stream.on(:half_close) do
log(level: 2, label: "#{stream.id}: ") { "waiting for response..." }
end
- # stream.on(:altsvc)
+ stream.on(:altsvc, &method(:on_altsvc).curry[request.origin])
stream.on(:headers, &method(:on_stream_headers).curry[stream, request])
stream.on(:data, &method(:on_stream_data).curry[stream, request])
end
def join_headers(stream, request)
@@ -185,12 +218,14 @@
def on_frame(bytes)
@buffer << bytes
end
def on_settings(*)
+ @handshake_completed = true
@max_concurrent_requests = [@max_concurrent_requests,
@connection.remote_settings[:settings_max_concurrent_streams]].min
+ send_pending
end
def on_close(_last_frame, error, _payload)
if error
ex = Error.new(0, error)
@@ -223,12 +258,15 @@
frame.inspect
end
end
end
- def on_altsvc(frame)
+ def on_altsvc(origin, frame)
log(level: 2, label: "#{frame[:stream]}: ") { "altsvc frame was received" }
log(level: 2, label: "#{frame[:stream]}: ") { frame.inspect }
+ alt_origin = URI.parse("#{frame[:proto]}://#{frame[:host]}:#{frame[:port]}")
+ params = { "ma" => frame[:max_age] }
+ emit(:altsvc, origin, alt_origin, origin, params)
end
def on_promise(stream)
emit(:promise, self, stream)
end