lib/httpx/connection.rb in httpx-0.3.1 vs lib/httpx/connection.rb in httpx-0.4.0

- old
+ new

@@ -1,150 +1,372 @@ # frozen_string_literal: true -require "httpx/selector" -require "httpx/channel" -require "httpx/resolver" +require "resolv" +require "forwardable" +require "httpx/io" +require "httpx/buffer" module HTTPX + # The Connection can be watched for IO events. + # + # It contains the +io+ object to read/write from, and knows what to do when it can. + # + # It defers connecting until absolutely necessary. Connection should be triggered from + # the IO selector (until then, any request will be queued). + # + # A connection boots up its parser after connection is established. All pending requests + # will be redirected there after connection. + # + # A connection can be prevented from closing by the parser, that is, if there are pending + # requests. This will signal that the connection was prematurely closed, due to a possible + # number of conditions: + # + # * Remote peer closed the connection ("Connection: close"); + # * Remote peer doesn't support pipelining; + # + # A connection may also route requests for a different host for which the +io+ was connected + # to, provided that the IP is the same and the port and scheme as well. This will allow to + # share the same socket to send HTTP/2 requests to different hosts. + # class Connection - def initialize(options) + extend Forwardable + include Registry + include Loggable + include Callbacks + + using URIExtensions + + require "httpx/connection/http2" + require "httpx/connection/http1" + + BUFFER_SIZE = 1 << 14 + + def_delegator :@io, :closed? + + def_delegator :@write_buffer, :empty? + + attr_reader :origin, :state, :pending, :options + + attr_reader :timeout + + def initialize(type, uri, options) + @type = type + @origins = [uri.origin] + @origin = URI(uri.origin) @options = Options.new(options) - @timeout = options.timeout - resolver_type = @options.resolver_class - resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol) - @selector = Selector.new - @channels = [] - @connected_channels = 0 - @resolver = resolver_type.new(self, @options) - @resolver.on(:resolve, &method(:on_resolver_channel)) - @resolver.on(:error, &method(:on_resolver_error)) - @resolver.on(:close, &method(:on_resolver_close)) + @window_size = @options.window_size + @read_buffer = Buffer.new(BUFFER_SIZE) + @write_buffer = Buffer.new(BUFFER_SIZE) + @pending = [] + on(:error, &method(:on_error)) + if @options.io + # if there's an already open IO, get its + # peer address, and force-initiate the parser + transition(:already_open) + @io = IO.registry(@type).new(@origin, nil, @options) + parser + else + transition(:idle) + end end - def running? - !@channels.empty? + # this is a semi-private method, to be used by the resolver + # to initiate the io object. + def addresses=(addrs) + @io ||= IO.registry(@type).new(@origin, addrs, @options) # rubocop:disable Naming/MemoizedInstanceVariableName end - def next_tick - catch(:jump_tick) do - @selector.select(next_timeout) do |monitor| - if (channel = monitor.value) - channel.call - end - monitor.interests = channel.interests + def addresses + @io && @io.addresses + end + + def match?(uri, options) + return false if @state == :closing || @state == :closed + + (@origins.include?(uri.origin) || match_altsvcs?(uri)) && @options == options + end + + def mergeable?(connection) + return false if @state == :closing || @state == :closed || !@io + + !(@io.addresses & connection.addresses).empty? && @options == connection.options + end + + # coalescable connections need to be mergeable! + # but internally, #mergeable? is called before #coalescable? + def coalescable?(connection) + if @io.protocol == "h2" && @origin.scheme == "https" + @io.verify_hostname(connection.origin.host) + else + @origin == connection.origin + end + end + + def merge(connection) + @origins += connection.instance_variable_get(:@origins) + pending = connection.instance_variable_get(:@pending) + pending.each do |req, args| + send(req, args) + end + end + + def unmerge(connection) + @origins -= connection.instance_variable_get(:@origins) + purge_pending do |request| + request.uri.origin == connection.origin && begin + request.transition(:idle) + connection.send(request) + true end end - rescue TimeoutError => timeout_error - @channels.each do |ch| - ch.handle_timeout_error(timeout_error) + end + + def purge_pending + [@parser.pending, @pending].each do |pending| + pending.reject! do |request, *args| + yield(request, args) + end end - rescue Errno::ECONNRESET, - Errno::ECONNABORTED, - Errno::EPIPE => ex - @channels.each do |ch| - ch.emit(:error, ex) + end + + # checks if this is connection is an alternative service of + # +uri+ + def match_altsvcs?(uri) + AltSvc.cached_altsvc(@origin).any? do |altsvc| + origin = altsvc["origin"] + origin.altsvc_match?(uri.origin) end end - def close - @resolver.close unless @resolver.closed? - @channels.each(&:close) - next_tick until @channels.empty? + def connecting? + @state == :idle end - def build_channel(uri, **options) - channel = Channel.by(uri, @options.merge(options)) - resolve_channel(channel) - channel.on(:open) do - @connected_channels += 1 - @timeout.transition(:open) if @channels.size == @connected_channels + def inflight? + @parser && !@parser.empty? && !@write_buffer.empty? + end + + def interests + return :w if @state == :idle + + readable = !@read_buffer.full? + writable = !@write_buffer.empty? + if readable + writable ? :rw : :r + else + writable ? :w : :r end - channel.on(:reset) do - @timeout.transition(:idle) + end + + def to_io + case @state + when :idle + transition(:open) end - channel.on(:unreachable) do - @resolver.uncache(channel) - resolve_channel(channel) + @io.to_io + end + + def close + @parser.close if @parser + transition(:closing) + end + + def reset + transition(:closing) + transition(:closed) + emit(:close) + end + + def send(request) + if @error_response + emit(:response, request, @error_response) + elsif @parser && !@write_buffer.full? + request.headers["alt-used"] = @origin.authority if match_altsvcs?(request.uri) + parser.send(request) + else + @pending << request end - channel end - # opens a channel to the IP reachable through +uri+. - # Many hostnames are reachable through the same IP, so we try to - # maximize pipelining by opening as few channels as possible. - # - def find_channel(uri) - @channels.find do |channel| - channel.match?(uri) + def call + @timeout = @timeout_threshold + case @state + when :closed + return + when :closing + dwrite + transition(:closed) + emit(:close) + when :open + consume end + nil end private - def resolve_channel(channel) - @channels << channel unless @channels.include?(channel) - @resolver << channel - return if @resolver.empty? - @_resolver_monitor ||= begin # rubocop:disable Naming/MemoizedInstanceVariableName - monitor = @selector.register(@resolver, :w) - monitor.value = @resolver - monitor + def consume + catch(:called) do + dread + dwrite + parser.consume end end - def on_resolver_channel(channel, addresses) - found_channel = @channels.find do |ch| - ch != channel && ch.mergeable?(addresses) + def dread(wsize = @window_size) + loop do + siz = @io.read(wsize, @read_buffer) + unless siz + ex = EOFError.new("descriptor closed") + ex.set_backtrace(caller) + on_error(ex) + return + end + return if siz.zero? + + log { "READ: #{siz} bytes..." } + parser << @read_buffer.to_s + return if @state == :closing || @state == :closed end - return register_channel(channel) unless found_channel - if found_channel.state == :open - coalesce_channels(found_channel, channel) - else - found_channel.once(:open) do - coalesce_channels(found_channel, channel) + end + + def dwrite + loop do + return if @write_buffer.empty? + + siz = @io.write(@write_buffer) + unless siz + ex = EOFError.new("descriptor closed") + ex.set_backtrace(caller) + on_error(ex) + return end + log { "WRITE: #{siz} bytes..." } + return if siz.zero? + return if @state == :closing || @state == :closed end end - def on_resolver_error(ch, error) - ch.emit(:error, error) - # must remove channel by hand, hasn't been started yet - unregister_channel(ch) + def send_pending + while !@write_buffer.full? && (req_args = @pending.shift) + request = req_args + parser.send(request) + end end - def on_resolver_close - @selector.deregister(@resolver) - @_resolver_monitor = nil - @resolver.close unless @resolver.closed? + def parser + @parser ||= build_parser end - def register_channel(channel) - @timeout.transition(:idle) - monitor = @selector.register(channel, :w) - monitor.value = channel - channel.on(:close) do - unregister_channel(channel) - end + def build_parser(protocol = @io.protocol) + parser = registry(protocol).new(@write_buffer, @options) + set_parser_callbacks(parser) + parser end - def unregister_channel(channel) - @channels.delete(channel) - @selector.deregister(channel) - @connected_channels -= 1 + def set_parser_callbacks(parser) + parser.on(:response) do |request, response| + AltSvc.emit(request, response) do |alt_origin, origin, alt_params| + emit(:altsvc, alt_origin, origin, alt_params) + end + request.emit(:response, response) + end + parser.on(:altsvc) do |alt_origin, origin, alt_params| + emit(:altsvc, alt_origin, origin, alt_params) + end + + parser.on(:promise) do |request, stream| + request.emit(:promise, parser, stream) + end + parser.on(:close) do + transition(:closing) + end + parser.on(:reset) do + transition(:closing) + unless parser.empty? + transition(:closed) + emit(:reset) + transition(:idle) + transition(:open) + end + end + parser.on(:timeout) do |timeout| + @timeout = timeout + end + parser.on(:error) do |request, ex| + case ex + when MisdirectedRequestError + emit(:uncoalesce, request.uri) + else + response = ErrorResponse.new(ex, @options) + request.emit(:response, response) + end + end end - def coalesce_channels(ch1, ch2) - if ch1.coalescable?(ch2) - ch1.merge(ch2) - @channels.delete(ch2) - else - register_channel(ch2) + def transition(nextstate) + case nextstate + when :idle + @error_response = nil + @timeout_threshold = @options.timeout.connect_timeout + @timeout = @timeout_threshold + when :open + return if @state == :closed + + @io.connect + return unless @io.connected? + + send_pending + @timeout_threshold = @options.timeout.operation_timeout + @timeout = @timeout_threshold + emit(:open) + when :closing + return unless @state == :open + when :closed + return unless @state == :closing + return unless @write_buffer.empty? + + @io.close + @read_buffer.clear + when :already_open + nextstate = :open + send_pending + @timeout_threshold = @options.timeout.operation_timeout + @timeout = @timeout_threshold end + @state = nextstate + rescue Errno::EHOSTUNREACH + # at this point, all addresses from the IO object have failed + reset + emit(:unreachable) + throw(:jump_tick) + rescue Errno::ECONNREFUSED, + Errno::EADDRNOTAVAIL, + Errno::EHOSTUNREACH, + OpenSSL::SSL::SSLError => e + # connect errors, exit gracefully + handle_error(e) + @state = :closed + emit(:close) end - def next_timeout - timeout = @timeout.timeout - return (@resolver.timeout || timeout) unless @resolver.closed? - timeout + def on_error(ex) + handle_error(ex) + reset + end + + def handle_error(e) + if e.instance_of?(TimeoutError) && @timeout + @timeout -= e.timeout + return unless @timeout <= 0 + + e = e.to_connection_error if connecting? + end + + parser.handle_error(e) if @parser && parser.respond_to?(:handle_error) + @error_response = ErrorResponse.new(e, @options) + @pending.each do |request, _| + request.emit(:response, @error_response) + end end end end