lib/httpx/connection.rb in httpx-0.3.1 vs lib/httpx/connection.rb in httpx-0.4.0
- old
+ new
@@ -1,150 +1,372 @@
# frozen_string_literal: true
-require "httpx/selector"
-require "httpx/channel"
-require "httpx/resolver"
+require "resolv"
+require "forwardable"
+require "httpx/io"
+require "httpx/buffer"
module HTTPX
+ # The Connection can be watched for IO events.
+ #
+ # It contains the +io+ object to read/write from, and knows what to do when it can.
+ #
+ # It defers connecting until absolutely necessary. Connection should be triggered from
+ # the IO selector (until then, any request will be queued).
+ #
+ # A connection boots up its parser after connection is established. All pending requests
+ # will be redirected there after connection.
+ #
+ # A connection can be prevented from closing by the parser, that is, if there are pending
+ # requests. This will signal that the connection was prematurely closed, due to a possible
+ # number of conditions:
+ #
+ # * Remote peer closed the connection ("Connection: close");
+ # * Remote peer doesn't support pipelining;
+ #
+ # A connection may also route requests for a different host for which the +io+ was connected
+ # to, provided that the IP is the same and the port and scheme as well. This will allow to
+ # share the same socket to send HTTP/2 requests to different hosts.
+ #
class Connection
- def initialize(options)
+ extend Forwardable
+ include Registry
+ include Loggable
+ include Callbacks
+
+ using URIExtensions
+
+ require "httpx/connection/http2"
+ require "httpx/connection/http1"
+
+ BUFFER_SIZE = 1 << 14
+
+ def_delegator :@io, :closed?
+
+ def_delegator :@write_buffer, :empty?
+
+ attr_reader :origin, :state, :pending, :options
+
+ attr_reader :timeout
+
+ def initialize(type, uri, options)
+ @type = type
+ @origins = [uri.origin]
+ @origin = URI(uri.origin)
@options = Options.new(options)
- @timeout = options.timeout
- resolver_type = @options.resolver_class
- resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol)
- @selector = Selector.new
- @channels = []
- @connected_channels = 0
- @resolver = resolver_type.new(self, @options)
- @resolver.on(:resolve, &method(:on_resolver_channel))
- @resolver.on(:error, &method(:on_resolver_error))
- @resolver.on(:close, &method(:on_resolver_close))
+ @window_size = @options.window_size
+ @read_buffer = Buffer.new(BUFFER_SIZE)
+ @write_buffer = Buffer.new(BUFFER_SIZE)
+ @pending = []
+ on(:error, &method(:on_error))
+ if @options.io
+ # if there's an already open IO, get its
+ # peer address, and force-initiate the parser
+ transition(:already_open)
+ @io = IO.registry(@type).new(@origin, nil, @options)
+ parser
+ else
+ transition(:idle)
+ end
end
- def running?
- !@channels.empty?
+ # this is a semi-private method, to be used by the resolver
+ # to initiate the io object.
+ def addresses=(addrs)
+ @io ||= IO.registry(@type).new(@origin, addrs, @options) # rubocop:disable Naming/MemoizedInstanceVariableName
end
- def next_tick
- catch(:jump_tick) do
- @selector.select(next_timeout) do |monitor|
- if (channel = monitor.value)
- channel.call
- end
- monitor.interests = channel.interests
+ def addresses
+ @io && @io.addresses
+ end
+
+ def match?(uri, options)
+ return false if @state == :closing || @state == :closed
+
+ (@origins.include?(uri.origin) || match_altsvcs?(uri)) && @options == options
+ end
+
+ def mergeable?(connection)
+ return false if @state == :closing || @state == :closed || !@io
+
+ !(@io.addresses & connection.addresses).empty? && @options == connection.options
+ end
+
+ # coalescable connections need to be mergeable!
+ # but internally, #mergeable? is called before #coalescable?
+ def coalescable?(connection)
+ if @io.protocol == "h2" && @origin.scheme == "https"
+ @io.verify_hostname(connection.origin.host)
+ else
+ @origin == connection.origin
+ end
+ end
+
+ def merge(connection)
+ @origins += connection.instance_variable_get(:@origins)
+ pending = connection.instance_variable_get(:@pending)
+ pending.each do |req, args|
+ send(req, args)
+ end
+ end
+
+ def unmerge(connection)
+ @origins -= connection.instance_variable_get(:@origins)
+ purge_pending do |request|
+ request.uri.origin == connection.origin && begin
+ request.transition(:idle)
+ connection.send(request)
+ true
end
end
- rescue TimeoutError => timeout_error
- @channels.each do |ch|
- ch.handle_timeout_error(timeout_error)
+ end
+
+ def purge_pending
+ [@parser.pending, @pending].each do |pending|
+ pending.reject! do |request, *args|
+ yield(request, args)
+ end
end
- rescue Errno::ECONNRESET,
- Errno::ECONNABORTED,
- Errno::EPIPE => ex
- @channels.each do |ch|
- ch.emit(:error, ex)
+ end
+
+ # checks if this is connection is an alternative service of
+ # +uri+
+ def match_altsvcs?(uri)
+ AltSvc.cached_altsvc(@origin).any? do |altsvc|
+ origin = altsvc["origin"]
+ origin.altsvc_match?(uri.origin)
end
end
- def close
- @resolver.close unless @resolver.closed?
- @channels.each(&:close)
- next_tick until @channels.empty?
+ def connecting?
+ @state == :idle
end
- def build_channel(uri, **options)
- channel = Channel.by(uri, @options.merge(options))
- resolve_channel(channel)
- channel.on(:open) do
- @connected_channels += 1
- @timeout.transition(:open) if @channels.size == @connected_channels
+ def inflight?
+ @parser && !@parser.empty? && !@write_buffer.empty?
+ end
+
+ def interests
+ return :w if @state == :idle
+
+ readable = !@read_buffer.full?
+ writable = !@write_buffer.empty?
+ if readable
+ writable ? :rw : :r
+ else
+ writable ? :w : :r
end
- channel.on(:reset) do
- @timeout.transition(:idle)
+ end
+
+ def to_io
+ case @state
+ when :idle
+ transition(:open)
end
- channel.on(:unreachable) do
- @resolver.uncache(channel)
- resolve_channel(channel)
+ @io.to_io
+ end
+
+ def close
+ @parser.close if @parser
+ transition(:closing)
+ end
+
+ def reset
+ transition(:closing)
+ transition(:closed)
+ emit(:close)
+ end
+
+ def send(request)
+ if @error_response
+ emit(:response, request, @error_response)
+ elsif @parser && !@write_buffer.full?
+ request.headers["alt-used"] = @origin.authority if match_altsvcs?(request.uri)
+ parser.send(request)
+ else
+ @pending << request
end
- channel
end
- # opens a channel to the IP reachable through +uri+.
- # Many hostnames are reachable through the same IP, so we try to
- # maximize pipelining by opening as few channels as possible.
- #
- def find_channel(uri)
- @channels.find do |channel|
- channel.match?(uri)
+ def call
+ @timeout = @timeout_threshold
+ case @state
+ when :closed
+ return
+ when :closing
+ dwrite
+ transition(:closed)
+ emit(:close)
+ when :open
+ consume
end
+ nil
end
private
- def resolve_channel(channel)
- @channels << channel unless @channels.include?(channel)
- @resolver << channel
- return if @resolver.empty?
- @_resolver_monitor ||= begin # rubocop:disable Naming/MemoizedInstanceVariableName
- monitor = @selector.register(@resolver, :w)
- monitor.value = @resolver
- monitor
+ def consume
+ catch(:called) do
+ dread
+ dwrite
+ parser.consume
end
end
- def on_resolver_channel(channel, addresses)
- found_channel = @channels.find do |ch|
- ch != channel && ch.mergeable?(addresses)
+ def dread(wsize = @window_size)
+ loop do
+ siz = @io.read(wsize, @read_buffer)
+ unless siz
+ ex = EOFError.new("descriptor closed")
+ ex.set_backtrace(caller)
+ on_error(ex)
+ return
+ end
+ return if siz.zero?
+
+ log { "READ: #{siz} bytes..." }
+ parser << @read_buffer.to_s
+ return if @state == :closing || @state == :closed
end
- return register_channel(channel) unless found_channel
- if found_channel.state == :open
- coalesce_channels(found_channel, channel)
- else
- found_channel.once(:open) do
- coalesce_channels(found_channel, channel)
+ end
+
+ def dwrite
+ loop do
+ return if @write_buffer.empty?
+
+ siz = @io.write(@write_buffer)
+ unless siz
+ ex = EOFError.new("descriptor closed")
+ ex.set_backtrace(caller)
+ on_error(ex)
+ return
end
+ log { "WRITE: #{siz} bytes..." }
+ return if siz.zero?
+ return if @state == :closing || @state == :closed
end
end
- def on_resolver_error(ch, error)
- ch.emit(:error, error)
- # must remove channel by hand, hasn't been started yet
- unregister_channel(ch)
+ def send_pending
+ while !@write_buffer.full? && (req_args = @pending.shift)
+ request = req_args
+ parser.send(request)
+ end
end
- def on_resolver_close
- @selector.deregister(@resolver)
- @_resolver_monitor = nil
- @resolver.close unless @resolver.closed?
+ def parser
+ @parser ||= build_parser
end
- def register_channel(channel)
- @timeout.transition(:idle)
- monitor = @selector.register(channel, :w)
- monitor.value = channel
- channel.on(:close) do
- unregister_channel(channel)
- end
+ def build_parser(protocol = @io.protocol)
+ parser = registry(protocol).new(@write_buffer, @options)
+ set_parser_callbacks(parser)
+ parser
end
- def unregister_channel(channel)
- @channels.delete(channel)
- @selector.deregister(channel)
- @connected_channels -= 1
+ def set_parser_callbacks(parser)
+ parser.on(:response) do |request, response|
+ AltSvc.emit(request, response) do |alt_origin, origin, alt_params|
+ emit(:altsvc, alt_origin, origin, alt_params)
+ end
+ request.emit(:response, response)
+ end
+ parser.on(:altsvc) do |alt_origin, origin, alt_params|
+ emit(:altsvc, alt_origin, origin, alt_params)
+ end
+
+ parser.on(:promise) do |request, stream|
+ request.emit(:promise, parser, stream)
+ end
+ parser.on(:close) do
+ transition(:closing)
+ end
+ parser.on(:reset) do
+ transition(:closing)
+ unless parser.empty?
+ transition(:closed)
+ emit(:reset)
+ transition(:idle)
+ transition(:open)
+ end
+ end
+ parser.on(:timeout) do |timeout|
+ @timeout = timeout
+ end
+ parser.on(:error) do |request, ex|
+ case ex
+ when MisdirectedRequestError
+ emit(:uncoalesce, request.uri)
+ else
+ response = ErrorResponse.new(ex, @options)
+ request.emit(:response, response)
+ end
+ end
end
- def coalesce_channels(ch1, ch2)
- if ch1.coalescable?(ch2)
- ch1.merge(ch2)
- @channels.delete(ch2)
- else
- register_channel(ch2)
+ def transition(nextstate)
+ case nextstate
+ when :idle
+ @error_response = nil
+ @timeout_threshold = @options.timeout.connect_timeout
+ @timeout = @timeout_threshold
+ when :open
+ return if @state == :closed
+
+ @io.connect
+ return unless @io.connected?
+
+ send_pending
+ @timeout_threshold = @options.timeout.operation_timeout
+ @timeout = @timeout_threshold
+ emit(:open)
+ when :closing
+ return unless @state == :open
+ when :closed
+ return unless @state == :closing
+ return unless @write_buffer.empty?
+
+ @io.close
+ @read_buffer.clear
+ when :already_open
+ nextstate = :open
+ send_pending
+ @timeout_threshold = @options.timeout.operation_timeout
+ @timeout = @timeout_threshold
end
+ @state = nextstate
+ rescue Errno::EHOSTUNREACH
+ # at this point, all addresses from the IO object have failed
+ reset
+ emit(:unreachable)
+ throw(:jump_tick)
+ rescue Errno::ECONNREFUSED,
+ Errno::EADDRNOTAVAIL,
+ Errno::EHOSTUNREACH,
+ OpenSSL::SSL::SSLError => e
+ # connect errors, exit gracefully
+ handle_error(e)
+ @state = :closed
+ emit(:close)
end
- def next_timeout
- timeout = @timeout.timeout
- return (@resolver.timeout || timeout) unless @resolver.closed?
- timeout
+ def on_error(ex)
+ handle_error(ex)
+ reset
+ end
+
+ def handle_error(e)
+ if e.instance_of?(TimeoutError) && @timeout
+ @timeout -= e.timeout
+ return unless @timeout <= 0
+
+ e = e.to_connection_error if connecting?
+ end
+
+ parser.handle_error(e) if @parser && parser.respond_to?(:handle_error)
+ @error_response = ErrorResponse.new(e, @options)
+ @pending.each do |request, _|
+ request.emit(:response, @error_response)
+ end
end
end
end