lib/httpx/connection.rb in httpx-0.2.1 vs lib/httpx/connection.rb in httpx-0.3.0

- old
+ new

@@ -11,10 +11,11 @@ @timeout = options.timeout resolver_type = @options.resolver_class resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol) @selector = Selector.new @channels = [] + @connected_channels = 0 @resolver = resolver_type.new(self, @options) @resolver.on(:resolve, &method(:on_resolver_channel)) @resolver.on(:error, &method(:on_resolver_error)) @resolver.on(:close, &method(:on_resolver_close)) end @@ -30,12 +31,15 @@ channel.call end monitor.interests = channel.interests end end - rescue TimeoutError, - Errno::ECONNRESET, + rescue TimeoutError => timeout_error + @channels.each do |ch| + ch.handle_timeout_error(timeout_error) + end + rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::EPIPE => ex @channels.each do |ch| ch.emit(:error, ex) end @@ -48,10 +52,17 @@ end def build_channel(uri, **options) channel = Channel.by(uri, @options.merge(options)) resolve_channel(channel) + channel.on(:open) do + @connected_channels += 1 + @timeout.transition(:open) if @channels.size == @connected_channels + end + channel.on(:reset) do + @timeout.transition(:idle) + end channel.once(:unreachable) do @resolver.uncache(channel) resolve_channel(channel) end channel @@ -105,33 +116,35 @@ @_resolver_monitor = nil @resolver.close unless @resolver.closed? end def register_channel(channel) + @timeout.transition(:idle) monitor = @selector.register(channel, :w) monitor.value = channel channel.on(:close) do unregister_channel(channel) end end def unregister_channel(channel) @channels.delete(channel) @selector.deregister(channel) + @connected_channels -= 1 end - def next_timeout - timeout = @timeout.timeout # force log time - return (@resolver.timeout || timeout) unless @resolver.closed? - timeout - end - def coalesce_channels(ch1, ch2) if ch1.coalescable?(ch2) ch1.merge(ch2) @channels.delete(ch2) else register_channel(ch2) end + end + + def next_timeout + timeout = @timeout.timeout + return (@resolver.timeout || timeout) unless @resolver.closed? + timeout end end end