lib/httpx/resolver/native.rb in httpx-0.3.1 vs lib/httpx/resolver/native.rb in httpx-0.4.0

- old
+ new

@@ -31,21 +31,21 @@ }.freeze end DNS_PORT = 53 - def_delegator :@channels, :empty? + def_delegator :@connections, :empty? - def initialize(_, options) + def initialize(options) @options = Options.new(options) @ns_index = 0 @resolver_options = Resolver::Options.new(DEFAULTS.merge(@options.resolver_options || {})) @nameserver = @resolver_options.nameserver @_timeouts = Array(@resolver_options.timeouts) @timeouts = Hash.new { |timeouts, host| timeouts[host] = @_timeouts.dup } @_record_types = Hash.new { |types, host| types[host] = @resolver_options.record_types.dup } - @channels = [] + @connections = [] @queries = {} @read_buffer = Buffer.new(@resolver_options.packet_size) @write_buffer = Buffer.new(@resolver_options.packet_size) @state = :idle end @@ -79,12 +79,12 @@ rescue Errno::EHOSTUNREACH => e @ns_index += 1 if @ns_index < @nameserver.size transition(:idle) else - @queries.each do |host, channel| - emit_resolve_error(channel, host, e) + @queries.each do |host, connection| + emit_resolve_error(connection, host, e) end end end def interests @@ -95,18 +95,20 @@ else writable ? :w : :r end end - def <<(channel) - return if early_resolve(channel) + def <<(connection) + return if early_resolve(connection) + if @nameserver.nil? - ex = ResolveError.new("Can't resolve #{channel.uri.host}: no nameserver") + ex = ResolveError.new("Can't resolve #{connection.origin.host}: no nameserver") ex.set_backtrace(caller) - emit(:error, channel, ex) + emit(:error, connection, ex) else - @channels << channel + @connections << connection + resolve end end def timeout @start_timeout = Process.clock_gettime(Process::CLOCK_MONOTONIC) @@ -122,53 +124,56 @@ dwrite end def do_retry return if @queries.empty? + loop_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_timeout - channels = [] + connections = [] queries = {} while (query = @queries.shift) - h, channel = query - host = channel.uri.host + h, connection = query + host = connection.origin.host timeout = (@timeouts[host][0] -= loop_time) unless timeout.negative? - queries[h] = channel + queries[h] = connection next end @timeouts[host].shift if @timeouts[host].empty? @timeouts.delete(host) - emit_resolve_error(channel, host) + emit_resolve_error(connection, host) return else - channels << channel + connections << connection log(label: "resolver: ") do "timeout after #{prev_timeout}s, retry(#{timeouts.first}) #{host}..." end end end @queries = queries - channels.each { |ch| resolve(ch) } + connections.each { |ch| resolve(ch) } end def dread(wsize = @read_buffer.limit) loop do siz = @io.read(wsize, @read_buffer) unless siz emit(:close) return end return if siz.zero? + log(label: "resolver: ") { "READ: #{siz} bytes..." } parse(@read_buffer.to_s) end end def dwrite loop do return if @write_buffer.empty? + siz = @io.write(@write_buffer) unless siz emit(:close) return end @@ -176,66 +181,70 @@ return if siz.zero? end end def parse(buffer) - addresses = begin - Resolver.decode_dns_answer(buffer) + begin + addresses = Resolver.decode_dns_answer(buffer) rescue Resolv::DNS::DecodeError => e - hostname, channel = @queries.first + hostname, connection = @queries.first if @_record_types[hostname].empty? - emit_resolve_error(channel, hostname, e) + emit_resolve_error(connection, hostname, e) return end end if addresses.empty? - hostname, channel = @queries.first + hostname, connection = @queries.first @_record_types[hostname].shift if @_record_types[hostname].empty? @_record_types.delete(hostname) - emit_resolve_error(channel, hostname) + emit_resolve_error(connection, hostname) return end else address = addresses.first - channel = @queries.delete(address["name"]) - return unless channel # probably a retried query for which there's an answer + connection = @queries.delete(address["name"]) + return unless connection # probably a retried query for which there's an answer + if address.key?("alias") # CNAME - if early_resolve(channel, hostname: address["alias"]) - @channels.delete(channel) + if early_resolve(connection, hostname: address["alias"]) + @connections.delete(connection) else - resolve(channel, address["alias"]) + resolve(connection, address["alias"]) @queries.delete(address["name"]) return end else - @channels.delete(channel) - Resolver.cached_lookup_set(channel.uri.host, addresses) - emit_addresses(channel, addresses.map { |addr| addr["data"] }) + @connections.delete(connection) + Resolver.cached_lookup_set(connection.origin.host, addresses) + emit_addresses(connection, addresses.map { |addr| addr["data"] }) end end - return emit(:close) if @channels.empty? + return emit(:close) if @connections.empty? + resolve end - def resolve(channel = @channels.first, hostname = nil) - raise Error, "no URI to resolve" unless channel + def resolve(connection = @connections.first, hostname = nil) + raise Error, "no URI to resolve" unless connection return unless @write_buffer.empty? - hostname = hostname || @queries.key(channel) || channel.uri.host - @queries[hostname] = channel + + hostname = hostname || @queries.key(connection) || connection.origin.host + @queries[hostname] = connection type = @_record_types[hostname].first log(label: "resolver: ") { "query #{type} for #{hostname}" } begin @write_buffer << Resolver.encode_dns_query(hostname, type: RECORD_TYPES[type]) rescue Resolv::DNS::EncodeError => e - emit_resolve_error(channel, hostname, e) + emit_resolve_error(connection, hostname, e) end end def build_socket return if @io + ip, port = @nameserver[@ns_index] port ||= DNS_PORT uri = URI::Generic.build(scheme: "udp", port: port) uri.hostname = ip type = IO.registry(uri.scheme) @@ -251,14 +260,16 @@ @io = nil end @timeouts.clear when :open return unless @state == :idle + build_socket @io.connect return unless @io.connected? when :closed return unless @state == :open + @io.close if @io end @state = nextstate end end