lib/httpx/channel.rb in httpx-0.2.1 vs lib/httpx/channel.rb in httpx-0.3.0
- old
+ new
@@ -33,46 +33,53 @@
extend Forwardable
include Registry
include Loggable
include Callbacks
+ using URIExtensions
+
require "httpx/channel/http2"
require "httpx/channel/http1"
BUFFER_SIZE = 1 << 14
class << self
def by(uri, options)
type = options.transport || begin
case uri.scheme
- when "http" then "tcp"
- when "https" then "ssl"
+ when "http"
+ "tcp"
+ when "https"
+ "ssl"
+ when "h2"
+ options = options.merge(ssl: { alpn_protocols: %(h2) })
+ "ssl"
else
- raise Error, "#{uri}: #{uri.scheme}: unsupported URI scheme"
+ raise UnsupportedSchemeError, "#{uri}: #{uri.scheme}: unsupported URI scheme"
end
end
new(type, uri, options)
end
end
def_delegator :@io, :closed?
def_delegator :@write_buffer, :empty?
- attr_reader :uri, :state
+ attr_reader :uri, :state, :pending
def initialize(type, uri, options)
@type = type
@uri = uri
- @hostnames = [@uri.host]
+ @origins = [@uri.origin]
@options = Options.new(options)
@window_size = @options.window_size
@read_buffer = Buffer.new(BUFFER_SIZE)
@write_buffer = Buffer.new(BUFFER_SIZE)
@pending = []
- @state = :idle
on(:error) { |ex| on_error(ex) }
+ transition(:idle)
end
def addresses=(addrs)
@io = IO.registry(@type).new(@uri, addrs, @options)
end
@@ -86,45 +93,60 @@
# but internally, #mergeable? is called before #coalescable?
def coalescable?(channel)
if @io.protocol == "h2" && @uri.scheme == "https"
@io.verify_hostname(channel.uri.host)
else
- @uri.host == channel.uri.host &&
- @uri.port == channel.uri.port &&
- @uri.scheme == channel.uri.scheme
+ @uri.origin == channel.uri.origin
end
end
def merge(channel)
- @hostnames += channel.instance_variable_get(:@hostnames)
+ @origins += channel.instance_variable_get(:@origins)
pending = channel.instance_variable_get(:@pending)
pending.each do |req, args|
send(req, args)
end
end
def unmerge(channel)
- @hostnames -= channel.instance_variable_get(:@hostnames)
+ @origins -= channel.instance_variable_get(:@origins)
+ purge_pending do |request, args|
+ request.uri == channel.uri && begin
+ request.transition(:idle)
+ channel.send(request, *args)
+ true
+ end
+ end
+ end
+
+ def purge_pending
[@parser.pending, @pending].each do |pending|
- pending.reject! do |request|
- request.uri == channel.uri && begin
- request.transition(:idle)
- channel.send(request)
- true
- end
+ pending.reject! do |request, *args|
+ yield(request, args)
end
end
end
def match?(uri)
return false if @state == :closing
- @hostnames.include?(uri.host) &&
- uri.port == @uri.port &&
- uri.scheme == @uri.scheme
+ @origins.include?(uri.origin) || match_altsvcs?(uri)
end
+ # checks if this is channel is an alternative service of
+ # +uri+
+ def match_altsvcs?(uri)
+ AltSvc.cached_altsvc(@uri.origin).any? do |altsvc|
+ origin = altsvc["origin"]
+ origin.altsvc_match?(uri.origin)
+ end
+ end
+
+ def connecting?
+ @state == :idle
+ end
+
def interests
return :w if @state == :idle
readable = !@read_buffer.full?
writable = !@write_buffer.empty?
if readable
@@ -155,17 +177,19 @@
def send(request, **args)
if @error_response
emit(:response, request, @error_response)
elsif @parser && !@write_buffer.full?
+ request.headers["alt-used"] = @uri.authority if match_altsvcs?(request.uri)
parser.send(request, **args)
else
@pending << [request, args]
end
end
def call
+ @timeout = @timeout_threshold
case @state
when :closed
return
when :closing
dwrite
@@ -180,10 +204,21 @@
def upgrade_parser(protocol)
@parser.reset if @parser
@parser = build_parser(protocol)
end
+ def handle_timeout_error(e)
+ return emit(:error, e) unless @timeout
+ @timeout -= e.timeout
+ return unless @timeout <= 0
+ if connecting?
+ emit(:error, e.to_connection_error)
+ else
+ emit(:error, e)
+ end
+ end
+
private
def consume
catch(:called) do
dread
@@ -202,10 +237,11 @@
return
end
return if siz.zero?
log { "READ: #{siz} bytes..." }
parser << @read_buffer.to_s
+ return if @state == :closing || @state == :closed
end
end
def dwrite
loop do
@@ -217,10 +253,11 @@
on_error(ex)
return
end
log { "WRITE: #{siz} bytes..." }
return if siz.zero?
+ return if @state == :closing || @state == :closed
end
end
def send_pending
while !@write_buffer.full? && (req_args = @pending.shift)
@@ -234,26 +271,37 @@
end
def build_parser(protocol = @io.protocol)
parser = registry(protocol).new(@write_buffer, @options)
parser.on(:response) do |*args|
+ AltSvc.emit(*args) do |alt_origin, origin, alt_params|
+ emit(:altsvc, alt_origin, origin, alt_params)
+ end
emit(:response, *args)
end
+ parser.on(:altsvc) do |alt_origin, origin, alt_params|
+ emit(:altsvc, alt_origin, origin, alt_params)
+ end
+
parser.on(:promise) do |*args|
emit(:promise, *args)
end
parser.on(:close) do
transition(:closing)
end
parser.on(:reset) do
transition(:closing)
unless parser.empty?
transition(:closed)
+ emit(:reset)
transition(:idle)
transition(:open)
end
end
+ parser.on(:timeout) do |timeout|
+ @timeout = timeout
+ end
parser.on(:error) do |request, ex|
case ex
when MisdirectedRequestError
emit(:uncoalesce, request.uri)
else
@@ -267,14 +315,18 @@
def transition(nextstate)
case nextstate
# when :idle
when :idle
@error_response = nil
+ @timeout_threshold = @options.timeout.connect_timeout
+ @timeout = @timeout_threshold
when :open
return if @state == :closed
@io.connect
return unless @io.connected?
send_pending
+ @timeout_threshold = @options.timeout.operation_timeout
+ @timeout = @timeout_threshold
emit(:open)
when :closing
return unless @state == :open
when :closed
return unless @state == :closing