lib/httpx/resolver/system.rb in httpx-0.18.7 vs lib/httpx/resolver/system.rb in httpx-0.19.0
- old
+ new
@@ -2,50 +2,206 @@
require "forwardable"
require "resolv"
module HTTPX
- class Resolver::System
- include Resolver::ResolverMixin
+ class Resolver::System < Resolver::Resolver
+ using URIExtensions
+ extend Forwardable
RESOLV_ERRORS = [Resolv::ResolvError,
Resolv::DNS::Requester::RequestError,
Resolv::DNS::EncodeError,
Resolv::DNS::DecodeError].freeze
+ DONE = 1
+ ERROR = 2
+
+ class << self
+ def multi?
+ false
+ end
+ end
+
attr_reader :state
+ def_delegator :@connections, :empty?
+
def initialize(options)
- @options = HTTPX::Options.new(options)
+ super(nil, options)
@resolver_options = @options.resolver_options
- @state = :idle
resolv_options = @resolver_options.dup
- timeouts = resolv_options.delete(:timeouts)
+ timeouts = resolv_options.delete(:timeouts) || Resolver::RESOLVE_TIMEOUT
+ @_timeouts = Array(timeouts)
+ @timeouts = Hash.new { |tims, host| tims[host] = @_timeouts.dup }
resolv_options.delete(:cache)
- @resolver = Resolv::DNS.new(resolv_options.empty? ? nil : resolv_options)
- @resolver.timeouts = timeouts || Resolver::RESOLVE_TIMEOUT
+ @connections = []
+ @queries = []
+ @ips = []
+ @pipe_mutex = Thread::Mutex.new
+ @state = :idle
end
+ def resolvers
+ return enum_for(__method__) unless block_given?
+
+ yield self
+ end
+
+ def connections
+ EMPTY
+ end
+
+ def close
+ transition(:closed)
+ end
+
def closed?
- true
+ @state == :closed
end
- def empty?
- true
+ def to_io
+ @pipe_read.to_io
end
+ def call
+ case @state
+ when :open
+ consume
+ end
+ nil
+ end
+
+ def interests
+ return if @queries.empty?
+
+ :r
+ end
+
+ def timeout
+ return unless @queries.empty?
+
+ _, connection = @queries.first
+
+ @timeouts[connection.origin.host].first
+ end
+
def <<(connection)
+ @connections << connection
+ resolve
+ end
+
+ private
+
+ def transition(nextstate)
+ case nextstate
+ when :idle
+ @timeouts.clear
+ when :open
+ return unless @state == :idle
+
+ @pipe_read, @pipe_write = ::IO.pipe
+ when :closed
+ return unless @state == :open
+
+ @pipe_write.close
+ @pipe_read.close
+ end
+ @state = nextstate
+ end
+
+ def consume
+ return if @connections.empty?
+
+ while @pipe_read.ready? && (event = @pipe_read.getbyte)
+ case event
+ when DONE
+ *pair, addrs = @pipe_mutex.synchronize { @ips.pop }
+ @queries.delete(pair)
+
+ family, connection = pair
+ emit_addresses(connection, family, addrs)
+ when ERROR
+ *pair, error = @pipe_mutex.synchronize { @ips.pop }
+ @queries.delete(pair)
+
+ family, connection = pair
+ emit_resolve_error(connection, connection.origin.host, error)
+ end
+
+ @connections.delete(connection) if @queries.empty?
+ end
+
+ return emit(:close, self) if @connections.empty?
+
+ resolve
+ end
+
+ def resolve(connection = @connections.first)
+ raise Error, "no URI to resolve" unless connection
+ return unless @queries.empty?
+
hostname = connection.origin.host
- addresses = connection.addresses ||
- ip_resolve(hostname) ||
- system_resolve(hostname) ||
- @resolver.getaddresses(hostname)
- throw(:resolve_error, resolve_error(hostname)) if addresses.empty?
+ scheme = connection.origin.scheme
+ log { "resolver: resolve IDN #{connection.origin.non_ascii_hostname} as #{hostname}" } if connection.origin.non_ascii_hostname
- emit_addresses(connection, addresses)
- rescue Errno::EHOSTUNREACH, *RESOLV_ERRORS => e
- emit_resolve_error(connection, hostname, e)
+ transition(:open)
+
+ connection.options.ip_families.each do |family|
+ @queries << [family, connection]
+ end
+ async_resolve(connection, hostname, scheme)
+ consume
end
- def uncache(*); end
+ def async_resolve(connection, hostname, scheme)
+ families = connection.options.ip_families
+ log { "resolver: query for #{hostname}" }
+ resolve_timeout = @timeouts[connection.origin.host].first
+
+ Thread.start do
+ Thread.current.report_on_exception = false
+ begin
+ addrs = if resolve_timeout
+ Timeout.timeout(resolve_timeout) do
+ __addrinfo_resolve(hostname, scheme)
+ end
+ else
+ __addrinfo_resolve(hostname, scheme)
+ end
+ addrs = addrs.sort_by(&:afamily).group_by(&:afamily)
+ families.each do |family|
+ addresses = addrs[family]
+ next unless addresses
+
+ addresses.map!(&:ip_address)
+ addresses.uniq!
+ @pipe_mutex.synchronize do
+ @ips.unshift([family, connection, addresses])
+ @pipe_write.putc(DONE) unless @pipe_write.closed?
+ end
+ end
+ rescue Timeout::Error => e
+ ex = ResolveTimeoutError.new(resolve_timeout, e.message)
+ ex.set_backtrace(ex.backtrace)
+ @pipe_mutex.synchronize do
+ families.each do |family|
+ @ips.unshift([family, connection, ex])
+ @pipe_write.putc(ERROR) unless @pipe_write.closed?
+ end
+ end
+ rescue StandardError => e
+ @pipe_mutex.synchronize do
+ families.each do |family|
+ @ips.unshift([family, connection, e])
+ @pipe_write.putc(ERROR) unless @pipe_write.closed?
+ end
+ end
+ end
+ end
+ end
+
+ def __addrinfo_resolve(host, scheme)
+ Addrinfo.getaddrinfo(host, scheme, Socket::AF_UNSPEC, Socket::SOCK_STREAM)
+ end
end
end