lib/httpx/resolver/system.rb in httpx-0.18.7 vs lib/httpx/resolver/system.rb in httpx-0.19.0

- old
+ new

@@ -2,50 +2,206 @@ require "forwardable" require "resolv" module HTTPX - class Resolver::System - include Resolver::ResolverMixin + class Resolver::System < Resolver::Resolver + using URIExtensions + extend Forwardable RESOLV_ERRORS = [Resolv::ResolvError, Resolv::DNS::Requester::RequestError, Resolv::DNS::EncodeError, Resolv::DNS::DecodeError].freeze + DONE = 1 + ERROR = 2 + + class << self + def multi? + false + end + end + attr_reader :state + def_delegator :@connections, :empty? + def initialize(options) - @options = HTTPX::Options.new(options) + super(nil, options) @resolver_options = @options.resolver_options - @state = :idle resolv_options = @resolver_options.dup - timeouts = resolv_options.delete(:timeouts) + timeouts = resolv_options.delete(:timeouts) || Resolver::RESOLVE_TIMEOUT + @_timeouts = Array(timeouts) + @timeouts = Hash.new { |tims, host| tims[host] = @_timeouts.dup } resolv_options.delete(:cache) - @resolver = Resolv::DNS.new(resolv_options.empty? ? nil : resolv_options) - @resolver.timeouts = timeouts || Resolver::RESOLVE_TIMEOUT + @connections = [] + @queries = [] + @ips = [] + @pipe_mutex = Thread::Mutex.new + @state = :idle end + def resolvers + return enum_for(__method__) unless block_given? + + yield self + end + + def connections + EMPTY + end + + def close + transition(:closed) + end + def closed? - true + @state == :closed end - def empty? - true + def to_io + @pipe_read.to_io end + def call + case @state + when :open + consume + end + nil + end + + def interests + return if @queries.empty? + + :r + end + + def timeout + return unless @queries.empty? + + _, connection = @queries.first + + @timeouts[connection.origin.host].first + end + def <<(connection) + @connections << connection + resolve + end + + private + + def transition(nextstate) + case nextstate + when :idle + @timeouts.clear + when :open + return unless @state == :idle + + @pipe_read, @pipe_write = ::IO.pipe + when :closed + return unless @state == :open + + @pipe_write.close + @pipe_read.close + end + @state = nextstate + end + + def consume + return if @connections.empty? + + while @pipe_read.ready? && (event = @pipe_read.getbyte) + case event + when DONE + *pair, addrs = @pipe_mutex.synchronize { @ips.pop } + @queries.delete(pair) + + family, connection = pair + emit_addresses(connection, family, addrs) + when ERROR + *pair, error = @pipe_mutex.synchronize { @ips.pop } + @queries.delete(pair) + + family, connection = pair + emit_resolve_error(connection, connection.origin.host, error) + end + + @connections.delete(connection) if @queries.empty? + end + + return emit(:close, self) if @connections.empty? + + resolve + end + + def resolve(connection = @connections.first) + raise Error, "no URI to resolve" unless connection + return unless @queries.empty? + hostname = connection.origin.host - addresses = connection.addresses || - ip_resolve(hostname) || - system_resolve(hostname) || - @resolver.getaddresses(hostname) - throw(:resolve_error, resolve_error(hostname)) if addresses.empty? + scheme = connection.origin.scheme + log { "resolver: resolve IDN #{connection.origin.non_ascii_hostname} as #{hostname}" } if connection.origin.non_ascii_hostname - emit_addresses(connection, addresses) - rescue Errno::EHOSTUNREACH, *RESOLV_ERRORS => e - emit_resolve_error(connection, hostname, e) + transition(:open) + + connection.options.ip_families.each do |family| + @queries << [family, connection] + end + async_resolve(connection, hostname, scheme) + consume end - def uncache(*); end + def async_resolve(connection, hostname, scheme) + families = connection.options.ip_families + log { "resolver: query for #{hostname}" } + resolve_timeout = @timeouts[connection.origin.host].first + + Thread.start do + Thread.current.report_on_exception = false + begin + addrs = if resolve_timeout + Timeout.timeout(resolve_timeout) do + __addrinfo_resolve(hostname, scheme) + end + else + __addrinfo_resolve(hostname, scheme) + end + addrs = addrs.sort_by(&:afamily).group_by(&:afamily) + families.each do |family| + addresses = addrs[family] + next unless addresses + + addresses.map!(&:ip_address) + addresses.uniq! + @pipe_mutex.synchronize do + @ips.unshift([family, connection, addresses]) + @pipe_write.putc(DONE) unless @pipe_write.closed? + end + end + rescue Timeout::Error => e + ex = ResolveTimeoutError.new(resolve_timeout, e.message) + ex.set_backtrace(ex.backtrace) + @pipe_mutex.synchronize do + families.each do |family| + @ips.unshift([family, connection, ex]) + @pipe_write.putc(ERROR) unless @pipe_write.closed? + end + end + rescue StandardError => e + @pipe_mutex.synchronize do + families.each do |family| + @ips.unshift([family, connection, e]) + @pipe_write.putc(ERROR) unless @pipe_write.closed? + end + end + end + end + end + + def __addrinfo_resolve(host, scheme) + Addrinfo.getaddrinfo(host, scheme, Socket::AF_UNSPEC, Socket::SOCK_STREAM) + end end end