lib/httpx/channel.rb in httpx-0.2.1 vs lib/httpx/channel.rb in httpx-0.3.0

- old
+ new

@@ -33,46 +33,53 @@ extend Forwardable include Registry include Loggable include Callbacks + using URIExtensions + require "httpx/channel/http2" require "httpx/channel/http1" BUFFER_SIZE = 1 << 14 class << self def by(uri, options) type = options.transport || begin case uri.scheme - when "http" then "tcp" - when "https" then "ssl" + when "http" + "tcp" + when "https" + "ssl" + when "h2" + options = options.merge(ssl: { alpn_protocols: %(h2) }) + "ssl" else - raise Error, "#{uri}: #{uri.scheme}: unsupported URI scheme" + raise UnsupportedSchemeError, "#{uri}: #{uri.scheme}: unsupported URI scheme" end end new(type, uri, options) end end def_delegator :@io, :closed? def_delegator :@write_buffer, :empty? - attr_reader :uri, :state + attr_reader :uri, :state, :pending def initialize(type, uri, options) @type = type @uri = uri - @hostnames = [@uri.host] + @origins = [@uri.origin] @options = Options.new(options) @window_size = @options.window_size @read_buffer = Buffer.new(BUFFER_SIZE) @write_buffer = Buffer.new(BUFFER_SIZE) @pending = [] - @state = :idle on(:error) { |ex| on_error(ex) } + transition(:idle) end def addresses=(addrs) @io = IO.registry(@type).new(@uri, addrs, @options) end @@ -86,45 +93,60 @@ # but internally, #mergeable? is called before #coalescable? def coalescable?(channel) if @io.protocol == "h2" && @uri.scheme == "https" @io.verify_hostname(channel.uri.host) else - @uri.host == channel.uri.host && - @uri.port == channel.uri.port && - @uri.scheme == channel.uri.scheme + @uri.origin == channel.uri.origin end end def merge(channel) - @hostnames += channel.instance_variable_get(:@hostnames) + @origins += channel.instance_variable_get(:@origins) pending = channel.instance_variable_get(:@pending) pending.each do |req, args| send(req, args) end end def unmerge(channel) - @hostnames -= channel.instance_variable_get(:@hostnames) + @origins -= channel.instance_variable_get(:@origins) + purge_pending do |request, args| + request.uri == channel.uri && begin + request.transition(:idle) + channel.send(request, *args) + true + end + end + end + + def purge_pending [@parser.pending, @pending].each do |pending| - pending.reject! do |request| - request.uri == channel.uri && begin - request.transition(:idle) - channel.send(request) - true - end + pending.reject! do |request, *args| + yield(request, args) end end end def match?(uri) return false if @state == :closing - @hostnames.include?(uri.host) && - uri.port == @uri.port && - uri.scheme == @uri.scheme + @origins.include?(uri.origin) || match_altsvcs?(uri) end + # checks if this is channel is an alternative service of + # +uri+ + def match_altsvcs?(uri) + AltSvc.cached_altsvc(@uri.origin).any? do |altsvc| + origin = altsvc["origin"] + origin.altsvc_match?(uri.origin) + end + end + + def connecting? + @state == :idle + end + def interests return :w if @state == :idle readable = !@read_buffer.full? writable = !@write_buffer.empty? if readable @@ -155,17 +177,19 @@ def send(request, **args) if @error_response emit(:response, request, @error_response) elsif @parser && !@write_buffer.full? + request.headers["alt-used"] = @uri.authority if match_altsvcs?(request.uri) parser.send(request, **args) else @pending << [request, args] end end def call + @timeout = @timeout_threshold case @state when :closed return when :closing dwrite @@ -180,10 +204,21 @@ def upgrade_parser(protocol) @parser.reset if @parser @parser = build_parser(protocol) end + def handle_timeout_error(e) + return emit(:error, e) unless @timeout + @timeout -= e.timeout + return unless @timeout <= 0 + if connecting? + emit(:error, e.to_connection_error) + else + emit(:error, e) + end + end + private def consume catch(:called) do dread @@ -202,10 +237,11 @@ return end return if siz.zero? log { "READ: #{siz} bytes..." } parser << @read_buffer.to_s + return if @state == :closing || @state == :closed end end def dwrite loop do @@ -217,10 +253,11 @@ on_error(ex) return end log { "WRITE: #{siz} bytes..." } return if siz.zero? + return if @state == :closing || @state == :closed end end def send_pending while !@write_buffer.full? && (req_args = @pending.shift) @@ -234,26 +271,37 @@ end def build_parser(protocol = @io.protocol) parser = registry(protocol).new(@write_buffer, @options) parser.on(:response) do |*args| + AltSvc.emit(*args) do |alt_origin, origin, alt_params| + emit(:altsvc, alt_origin, origin, alt_params) + end emit(:response, *args) end + parser.on(:altsvc) do |alt_origin, origin, alt_params| + emit(:altsvc, alt_origin, origin, alt_params) + end + parser.on(:promise) do |*args| emit(:promise, *args) end parser.on(:close) do transition(:closing) end parser.on(:reset) do transition(:closing) unless parser.empty? transition(:closed) + emit(:reset) transition(:idle) transition(:open) end end + parser.on(:timeout) do |timeout| + @timeout = timeout + end parser.on(:error) do |request, ex| case ex when MisdirectedRequestError emit(:uncoalesce, request.uri) else @@ -267,14 +315,18 @@ def transition(nextstate) case nextstate # when :idle when :idle @error_response = nil + @timeout_threshold = @options.timeout.connect_timeout + @timeout = @timeout_threshold when :open return if @state == :closed @io.connect return unless @io.connected? send_pending + @timeout_threshold = @options.timeout.operation_timeout + @timeout = @timeout_threshold emit(:open) when :closing return unless @state == :open when :closed return unless @state == :closing