lib/httpx/connection.rb in httpx-0.2.1 vs lib/httpx/connection.rb in httpx-0.3.0
- old
+ new
@@ -11,10 +11,11 @@
@timeout = options.timeout
resolver_type = @options.resolver_class
resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol)
@selector = Selector.new
@channels = []
+ @connected_channels = 0
@resolver = resolver_type.new(self, @options)
@resolver.on(:resolve, &method(:on_resolver_channel))
@resolver.on(:error, &method(:on_resolver_error))
@resolver.on(:close, &method(:on_resolver_close))
end
@@ -30,12 +31,15 @@
channel.call
end
monitor.interests = channel.interests
end
end
- rescue TimeoutError,
- Errno::ECONNRESET,
+ rescue TimeoutError => timeout_error
+ @channels.each do |ch|
+ ch.handle_timeout_error(timeout_error)
+ end
+ rescue Errno::ECONNRESET,
Errno::ECONNABORTED,
Errno::EPIPE => ex
@channels.each do |ch|
ch.emit(:error, ex)
end
@@ -48,10 +52,17 @@
end
def build_channel(uri, **options)
channel = Channel.by(uri, @options.merge(options))
resolve_channel(channel)
+ channel.on(:open) do
+ @connected_channels += 1
+ @timeout.transition(:open) if @channels.size == @connected_channels
+ end
+ channel.on(:reset) do
+ @timeout.transition(:idle)
+ end
channel.once(:unreachable) do
@resolver.uncache(channel)
resolve_channel(channel)
end
channel
@@ -105,33 +116,35 @@
@_resolver_monitor = nil
@resolver.close unless @resolver.closed?
end
def register_channel(channel)
+ @timeout.transition(:idle)
monitor = @selector.register(channel, :w)
monitor.value = channel
channel.on(:close) do
unregister_channel(channel)
end
end
def unregister_channel(channel)
@channels.delete(channel)
@selector.deregister(channel)
+ @connected_channels -= 1
end
- def next_timeout
- timeout = @timeout.timeout # force log time
- return (@resolver.timeout || timeout) unless @resolver.closed?
- timeout
- end
-
def coalesce_channels(ch1, ch2)
if ch1.coalescable?(ch2)
ch1.merge(ch2)
@channels.delete(ch2)
else
register_channel(ch2)
end
+ end
+
+ def next_timeout
+ timeout = @timeout.timeout
+ return (@resolver.timeout || timeout) unless @resolver.closed?
+ timeout
end
end
end