lib/httpx/resolver/https.rb in httpx-0.3.1 vs lib/httpx/resolver/https.rb in httpx-0.4.0

- old
+ new

@@ -20,145 +20,148 @@ DEFAULTS = { uri: NAMESERVER, use_get: false, }.freeze - def_delegator :@channels, :empty? + def_delegator :@connections, :empty? - def_delegators :@resolver_channel, :to_io, :call, :interests, :close + def_delegators :@resolver_connection, :to_io, :call, :interests, :close - def initialize(connection, options) - @connection = connection + def initialize(options) @options = Options.new(options) @resolver_options = Resolver::Options.new(DEFAULTS.merge(@options.resolver_options || {})) @_record_types = Hash.new { |types, host| types[host] = RECORD_TYPES.keys.dup } @queries = {} @requests = {} - @channels = [] + @connections = [] @uri = URI(@resolver_options.uri) @uri_addresses = nil end - def <<(channel) + def <<(connection) @uri_addresses ||= Resolv.getaddresses(@uri.host) if @uri_addresses.empty? - ex = ResolveError.new("Can't resolve #{channel.uri.host}") + ex = ResolveError.new("Can't resolve #{connection.origin.host}") ex.set_backtrace(caller) - emit(:error, channel, ex) + emit(:error, connection, ex) else - early_resolve(channel) || resolve(channel) + early_resolve(connection) || resolve(connection) end end def timeout - timeout = @options.timeout - timeout.timeout + @connections.map(&:timeout).min end def closed? - return true unless @resolver_channel - resolver_channel.closed? + return true unless @resolver_connection + + resolver_connection.closed? end private - def resolver_channel - @resolver_channel ||= find_channel(@uri, @options) + def pool + Thread.current[:httpx_connection_pool] ||= Pool.new end - def resolve(channel = @channels.first, hostname = nil) - return if @building_channel - hostname = hostname || @queries.key(channel) || channel.uri.host + def resolver_connection + @resolver_connection ||= pool.find_connection(@uri, @options) || begin + @building_connection = true + connection = @options.connection_class.new("ssl", @uri, @options.merge(ssl: { alpn_protocols: %w[h2] })) + pool.init_connection(connection, @options) + emit_addresses(connection, @uri_addresses) + @building_connection = false + connection + end + end + + def resolve(connection = @connections.first, hostname = nil) + return if @building_connection + + hostname = hostname || @queries.key(connection) || connection.origin.host type = @_record_types[hostname].first log(label: "resolver: ") { "query #{type} for #{hostname}" } begin request = build_request(hostname, type) - @requests[request] = channel - resolver_channel.send(request) - @queries[hostname] = channel - @channels << channel + @requests[request] = connection + resolver_connection.send(request) + @queries[hostname] = connection + @connections << connection rescue Resolv::DNS::EncodeError, JSON::JSONError => e - emit_resolve_error(channel, hostname, e) + emit_resolve_error(connection, hostname, e) end end - def find_channel(_request, **options) - @connection.find_channel(@uri) || begin - @building_channel = true - channel = @connection.build_channel(@uri, **options) - emit_addresses(channel, @uri_addresses) - set_channel_callbacks(channel) - @building_channel = false - channel - end - end - - def set_channel_callbacks(channel) - channel.on(:response, &method(:on_response)) - channel.on(:promise, &method(:on_response)) - end - def on_response(request, response) response.raise_for_status rescue Error => ex - channel = @requests[request] - hostname = @queries.key(channel) + connection = @requests[request] + hostname = @queries.key(connection) error = ResolveError.new("Can't resolve #{hostname}: #{ex.message}") error.set_backtrace(ex.backtrace) - emit(:error, channel, error) + emit(:error, connection, error) else parse(response) ensure @requests.delete(request) end + def on_promise(_, stream) + log(level: 2, label: "#{stream.id}: ") { "refusing stream!" } + stream.refuse + end + def parse(response) - answers = begin - decode_response_body(response) + begin + answers = decode_response_body(response) rescue Resolv::DNS::DecodeError, JSON::JSONError => e - host, channel = @queries.first + host, connection = @queries.first if @_record_types[host].empty? - emit_resolve_error(channel, host, e) + emit_resolve_error(connection, host, e) return end end if answers.empty? - host, channel = @queries.first + host, connection = @queries.first @_record_types[host].shift if @_record_types[host].empty? @_record_types.delete(host) - emit_resolve_error(channel, host) + emit_resolve_error(connection, host) return end else answers = answers.group_by { |answer| answer["name"] } answers.each do |hostname, addresses| addresses = addresses.flat_map do |address| if address.key?("alias") alias_address = answers[address["alias"]] if alias_address.nil? - channel = @queries[hostname] + connection = @queries[hostname] @queries.delete(address["name"]) - resolve(channel, address["alias"]) + resolve(connection, address["alias"]) return # rubocop:disable Lint/NonLocalExitFromIterator else alias_address end else address end end.compact next if addresses.empty? + hostname = hostname[0..-2] if hostname.end_with?(".") - channel = @queries.delete(hostname) - next unless channel # probably a retried query for which there's an answer - @channels.delete(channel) + connection = @queries.delete(hostname) + next unless connection # probably a retried query for which there's an answer + + @connections.delete(connection) Resolver.cached_lookup_set(hostname, addresses) - emit_addresses(channel, addresses.map { |addr| addr["data"] }) + emit_addresses(connection, addresses.map { |addr| addr["data"] }) end end - return if @channels.empty? + return if @connections.empty? + resolve end def build_request(hostname, type) uri = @uri.dup @@ -173,10 +176,12 @@ payload = Resolver.encode_dns_query(hostname, type: RECORD_TYPES[type]) request = rklass.new("POST", uri, @options.merge(body: [payload])) request.headers["content-type"] = "application/dns-message" request.headers["accept"] = "application/dns-message" end + request.on(:response, &method(:on_response).curry[request]) + request.on(:promise, &method(:on_promise)) request end def decode_response_body(response) case response.headers["content-type"] @@ -186,10 +191,11 @@ payload = JSON.parse(response.to_s) payload["Answer"] when "application/dns-udpwireformat", "application/dns-message" Resolver.decode_dns_answer(response.to_s) - # TODO: what about the rest? + else + raise Error, "unsupported DNS mime-type (#{response.headers["content-type"]})" end end end end