lib/httpx/connection.rb in httpx-0.1.0 vs lib/httpx/connection.rb in httpx-0.2.0
- old
+ new
@@ -1,30 +1,38 @@
# frozen_string_literal: true
require "httpx/selector"
require "httpx/channel"
+require "httpx/resolver"
module HTTPX
class Connection
def initialize(options)
@options = Options.new(options)
@timeout = options.timeout
+ resolver_type = @options.resolver_class
+ resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol)
@selector = Selector.new
@channels = []
+ @resolver = resolver_type.new(self, @options)
+ @resolver.on(:resolve, &method(:on_resolver_channel))
+ @resolver.on(:error, &method(:on_resolver_error))
+ @resolver.on(:close, &method(:on_resolver_close))
end
def running?
!@channels.empty?
end
def next_tick
- timeout = @timeout.timeout
- @selector.select(timeout) do |monitor|
- if (channel = monitor.value)
- channel.call
+ catch(:jump_tick) do
+ @selector.select(next_timeout) do |monitor|
+ if (channel = monitor.value)
+ channel.call
+ end
+ monitor.interests = channel.interests
end
- monitor.interests = channel.interests
end
rescue TimeoutError,
Errno::ECONNRESET,
Errno::ECONNABORTED,
Errno::EPIPE => ex
@@ -32,17 +40,22 @@
ch.emit(:error, ex)
end
end
def close
+ @resolver.close unless @resolver.closed?
@channels.each(&:close)
next_tick until @channels.empty?
end
def build_channel(uri, **options)
channel = Channel.by(uri, @options.merge(options))
- register_channel(channel)
+ resolve_channel(channel)
+ channel.once(:unreachable) do
+ @resolver.uncache(channel)
+ resolve_channel(channel)
+ end
channel
end
# opens a channel to the IP reachable through +uri+.
# Many hostnames are reachable through the same IP, so we try to
@@ -54,16 +67,71 @@
end
end
private
+ def resolve_channel(channel)
+ @channels << channel unless @channels.include?(channel)
+ @resolver << channel
+ return if @resolver.empty?
+ @_resolver_monitor ||= begin # rubocop:disable Naming/MemoizedInstanceVariableName
+ monitor = @selector.register(@resolver, :w)
+ monitor.value = @resolver
+ monitor
+ end
+ end
+
+ def on_resolver_channel(channel, addresses)
+ found_channel = @channels.find do |ch|
+ ch != channel && ch.mergeable?(addresses)
+ end
+ return register_channel(channel) unless found_channel
+ if found_channel.state == :open
+ coalesce_channels(found_channel, channel)
+ else
+ found_channel.once(:open) do
+ coalesce_channels(found_channel, channel)
+ end
+ end
+ end
+
+ def on_resolver_error(ch, error)
+ ch.emit(:error, error)
+ # must remove channel by hand, hasn't been started yet
+ unregister_channel(ch)
+ end
+
+ def on_resolver_close
+ @selector.deregister(@resolver)
+ @_resolver_monitor = nil
+ @resolver.close unless @resolver.closed?
+ end
+
def register_channel(channel)
monitor = @selector.register(channel, :w)
monitor.value = channel
channel.on(:close) do
- @channels.delete(channel)
- @selector.deregister(channel)
+ unregister_channel(channel)
end
- @channels << channel
+ end
+
+ def unregister_channel(channel)
+ @channels.delete(channel)
+ @selector.deregister(channel)
+ end
+
+ def next_timeout
+ timeout = @timeout.timeout # force log time
+ return (@resolver.timeout || timeout) unless @resolver.closed?
+ timeout
+ end
+
+ def coalesce_channels(ch1, ch2)
+ if ch1.coalescable?(ch2)
+ ch1.merge(ch2)
+ @channels.delete(ch2)
+ else
+ register_channel(ch2)
+ end
end
end
end