lib/httpx/resolver/https.rb in httpx-0.3.1 vs lib/httpx/resolver/https.rb in httpx-0.4.0
- old
+ new
@@ -20,145 +20,148 @@
DEFAULTS = {
uri: NAMESERVER,
use_get: false,
}.freeze
- def_delegator :@channels, :empty?
+ def_delegator :@connections, :empty?
- def_delegators :@resolver_channel, :to_io, :call, :interests, :close
+ def_delegators :@resolver_connection, :to_io, :call, :interests, :close
- def initialize(connection, options)
- @connection = connection
+ def initialize(options)
@options = Options.new(options)
@resolver_options = Resolver::Options.new(DEFAULTS.merge(@options.resolver_options || {}))
@_record_types = Hash.new { |types, host| types[host] = RECORD_TYPES.keys.dup }
@queries = {}
@requests = {}
- @channels = []
+ @connections = []
@uri = URI(@resolver_options.uri)
@uri_addresses = nil
end
- def <<(channel)
+ def <<(connection)
@uri_addresses ||= Resolv.getaddresses(@uri.host)
if @uri_addresses.empty?
- ex = ResolveError.new("Can't resolve #{channel.uri.host}")
+ ex = ResolveError.new("Can't resolve #{connection.origin.host}")
ex.set_backtrace(caller)
- emit(:error, channel, ex)
+ emit(:error, connection, ex)
else
- early_resolve(channel) || resolve(channel)
+ early_resolve(connection) || resolve(connection)
end
end
def timeout
- timeout = @options.timeout
- timeout.timeout
+ @connections.map(&:timeout).min
end
def closed?
- return true unless @resolver_channel
- resolver_channel.closed?
+ return true unless @resolver_connection
+
+ resolver_connection.closed?
end
private
- def resolver_channel
- @resolver_channel ||= find_channel(@uri, @options)
+ def pool
+ Thread.current[:httpx_connection_pool] ||= Pool.new
end
- def resolve(channel = @channels.first, hostname = nil)
- return if @building_channel
- hostname = hostname || @queries.key(channel) || channel.uri.host
+ def resolver_connection
+ @resolver_connection ||= pool.find_connection(@uri, @options) || begin
+ @building_connection = true
+ connection = @options.connection_class.new("ssl", @uri, @options.merge(ssl: { alpn_protocols: %w[h2] }))
+ pool.init_connection(connection, @options)
+ emit_addresses(connection, @uri_addresses)
+ @building_connection = false
+ connection
+ end
+ end
+
+ def resolve(connection = @connections.first, hostname = nil)
+ return if @building_connection
+
+ hostname = hostname || @queries.key(connection) || connection.origin.host
type = @_record_types[hostname].first
log(label: "resolver: ") { "query #{type} for #{hostname}" }
begin
request = build_request(hostname, type)
- @requests[request] = channel
- resolver_channel.send(request)
- @queries[hostname] = channel
- @channels << channel
+ @requests[request] = connection
+ resolver_connection.send(request)
+ @queries[hostname] = connection
+ @connections << connection
rescue Resolv::DNS::EncodeError, JSON::JSONError => e
- emit_resolve_error(channel, hostname, e)
+ emit_resolve_error(connection, hostname, e)
end
end
- def find_channel(_request, **options)
- @connection.find_channel(@uri) || begin
- @building_channel = true
- channel = @connection.build_channel(@uri, **options)
- emit_addresses(channel, @uri_addresses)
- set_channel_callbacks(channel)
- @building_channel = false
- channel
- end
- end
-
- def set_channel_callbacks(channel)
- channel.on(:response, &method(:on_response))
- channel.on(:promise, &method(:on_response))
- end
-
def on_response(request, response)
response.raise_for_status
rescue Error => ex
- channel = @requests[request]
- hostname = @queries.key(channel)
+ connection = @requests[request]
+ hostname = @queries.key(connection)
error = ResolveError.new("Can't resolve #{hostname}: #{ex.message}")
error.set_backtrace(ex.backtrace)
- emit(:error, channel, error)
+ emit(:error, connection, error)
else
parse(response)
ensure
@requests.delete(request)
end
+ def on_promise(_, stream)
+ log(level: 2, label: "#{stream.id}: ") { "refusing stream!" }
+ stream.refuse
+ end
+
def parse(response)
- answers = begin
- decode_response_body(response)
+ begin
+ answers = decode_response_body(response)
rescue Resolv::DNS::DecodeError, JSON::JSONError => e
- host, channel = @queries.first
+ host, connection = @queries.first
if @_record_types[host].empty?
- emit_resolve_error(channel, host, e)
+ emit_resolve_error(connection, host, e)
return
end
end
if answers.empty?
- host, channel = @queries.first
+ host, connection = @queries.first
@_record_types[host].shift
if @_record_types[host].empty?
@_record_types.delete(host)
- emit_resolve_error(channel, host)
+ emit_resolve_error(connection, host)
return
end
else
answers = answers.group_by { |answer| answer["name"] }
answers.each do |hostname, addresses|
addresses = addresses.flat_map do |address|
if address.key?("alias")
alias_address = answers[address["alias"]]
if alias_address.nil?
- channel = @queries[hostname]
+ connection = @queries[hostname]
@queries.delete(address["name"])
- resolve(channel, address["alias"])
+ resolve(connection, address["alias"])
return # rubocop:disable Lint/NonLocalExitFromIterator
else
alias_address
end
else
address
end
end.compact
next if addresses.empty?
+
hostname = hostname[0..-2] if hostname.end_with?(".")
- channel = @queries.delete(hostname)
- next unless channel # probably a retried query for which there's an answer
- @channels.delete(channel)
+ connection = @queries.delete(hostname)
+ next unless connection # probably a retried query for which there's an answer
+
+ @connections.delete(connection)
Resolver.cached_lookup_set(hostname, addresses)
- emit_addresses(channel, addresses.map { |addr| addr["data"] })
+ emit_addresses(connection, addresses.map { |addr| addr["data"] })
end
end
- return if @channels.empty?
+ return if @connections.empty?
+
resolve
end
def build_request(hostname, type)
uri = @uri.dup
@@ -173,10 +176,12 @@
payload = Resolver.encode_dns_query(hostname, type: RECORD_TYPES[type])
request = rklass.new("POST", uri, @options.merge(body: [payload]))
request.headers["content-type"] = "application/dns-message"
request.headers["accept"] = "application/dns-message"
end
+ request.on(:response, &method(:on_response).curry[request])
+ request.on(:promise, &method(:on_promise))
request
end
def decode_response_body(response)
case response.headers["content-type"]
@@ -186,10 +191,11 @@
payload = JSON.parse(response.to_s)
payload["Answer"]
when "application/dns-udpwireformat",
"application/dns-message"
Resolver.decode_dns_answer(response.to_s)
- # TODO: what about the rest?
+ else
+ raise Error, "unsupported DNS mime-type (#{response.headers["content-type"]})"
end
end
end
end