lib/httpx/connection.rb in httpx-0.1.0 vs lib/httpx/connection.rb in httpx-0.2.0

- old
+ new

@@ -1,30 +1,38 @@ # frozen_string_literal: true require "httpx/selector" require "httpx/channel" +require "httpx/resolver" module HTTPX class Connection def initialize(options) @options = Options.new(options) @timeout = options.timeout + resolver_type = @options.resolver_class + resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol) @selector = Selector.new @channels = [] + @resolver = resolver_type.new(self, @options) + @resolver.on(:resolve, &method(:on_resolver_channel)) + @resolver.on(:error, &method(:on_resolver_error)) + @resolver.on(:close, &method(:on_resolver_close)) end def running? !@channels.empty? end def next_tick - timeout = @timeout.timeout - @selector.select(timeout) do |monitor| - if (channel = monitor.value) - channel.call + catch(:jump_tick) do + @selector.select(next_timeout) do |monitor| + if (channel = monitor.value) + channel.call + end + monitor.interests = channel.interests end - monitor.interests = channel.interests end rescue TimeoutError, Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPIPE => ex @@ -32,17 +40,22 @@ ch.emit(:error, ex) end end def close + @resolver.close unless @resolver.closed? @channels.each(&:close) next_tick until @channels.empty? end def build_channel(uri, **options) channel = Channel.by(uri, @options.merge(options)) - register_channel(channel) + resolve_channel(channel) + channel.once(:unreachable) do + @resolver.uncache(channel) + resolve_channel(channel) + end channel end # opens a channel to the IP reachable through +uri+. # Many hostnames are reachable through the same IP, so we try to @@ -54,16 +67,71 @@ end end private + def resolve_channel(channel) + @channels << channel unless @channels.include?(channel) + @resolver << channel + return if @resolver.empty? + @_resolver_monitor ||= begin # rubocop:disable Naming/MemoizedInstanceVariableName + monitor = @selector.register(@resolver, :w) + monitor.value = @resolver + monitor + end + end + + def on_resolver_channel(channel, addresses) + found_channel = @channels.find do |ch| + ch != channel && ch.mergeable?(addresses) + end + return register_channel(channel) unless found_channel + if found_channel.state == :open + coalesce_channels(found_channel, channel) + else + found_channel.once(:open) do + coalesce_channels(found_channel, channel) + end + end + end + + def on_resolver_error(ch, error) + ch.emit(:error, error) + # must remove channel by hand, hasn't been started yet + unregister_channel(ch) + end + + def on_resolver_close + @selector.deregister(@resolver) + @_resolver_monitor = nil + @resolver.close unless @resolver.closed? + end + def register_channel(channel) monitor = @selector.register(channel, :w) monitor.value = channel channel.on(:close) do - @channels.delete(channel) - @selector.deregister(channel) + unregister_channel(channel) end - @channels << channel + end + + def unregister_channel(channel) + @channels.delete(channel) + @selector.deregister(channel) + end + + def next_timeout + timeout = @timeout.timeout # force log time + return (@resolver.timeout || timeout) unless @resolver.closed? + timeout + end + + def coalesce_channels(ch1, ch2) + if ch1.coalescable?(ch2) + ch1.merge(ch2) + @channels.delete(ch2) + else + register_channel(ch2) + end end end end