lib/httpx/connection.rb in httpx-0.6.7 vs lib/httpx/connection.rb in httpx-0.7.0

- old
+ new

@@ -44,10 +44,12 @@ def_delegator :@write_buffer, :empty? attr_reader :origin, :state, :pending, :options + attr_writer :timers + def initialize(type, uri, options) @type = type @origins = [uri.origin] @origin = URI(uri.origin) @options = Options.new(options) @@ -78,10 +80,12 @@ end def match?(uri, options) return false if @state == :closing || @state == :closed + return false if exhausted? + ( ( @origins.include?(uri.origin) && # if there is more than one origin to match, it means that this connection # was the result of coalescing. To prevent blind trust in the case where the @@ -93,10 +97,12 @@ end def mergeable?(connection) return false if @state == :closing || @state == :closed || !@io + return false if exhausted? + !(@io.addresses & connection.addresses).empty? && @options == connection.options end # coalescable connections need to be mergeable! # but internally, #mergeable? is called before #coalescable? @@ -108,14 +114,17 @@ else @origin == connection.origin end end + def create_idle + self.class.new(@type, @origin, @options) + end + def merge(connection) @origins += connection.instance_variable_get(:@origins) - pending = connection.instance_variable_get(:@pending) - pending.each do |req| + connection.purge_pending do |req| send(req) end end def unmerge(connection) @@ -128,11 +137,14 @@ end end end def purge_pending - [*@parser.pending, *@pending].each do |pending| + pendings = [] + pendings << @parser.pending if @parser + pendings << @pending + pendings.each do |pending| pending.reject! do |request| yield request end end end @@ -211,10 +223,14 @@ @options.timeout.operation_timeout end private + def exhausted? + @parser && parser.exhausted? + end + def consume catch(:called) do dread dwrite parser.consume @@ -283,10 +299,13 @@ end parser.on(:promise) do |request, stream| request.emit(:promise, parser, stream) end + parser.on(:exhausted) do + emit(:exhausted) + end parser.on(:origin) do |origin| @origins << origin end parser.on(:close) do transition(:closing) @@ -317,10 +336,12 @@ def transition(nextstate) case nextstate when :open return if @state == :closed + total_timeout + @io.connect return unless @io.connected? send_pending emit(:open) @@ -328,10 +349,15 @@ return unless @state == :open when :closed return unless @state == :closing return unless @write_buffer.empty? + if @total_timeout + @total_timeout.cancel + remove_instance_variable(:@total_timeout) + end + @io.close @read_buffer.clear remove_instance_variable(:@timeout) if defined?(@timeout) when :already_open nextstate = :open @@ -369,9 +395,22 @@ end parser.handle_error(error) if @parser && parser.respond_to?(:handle_error) while (request = @pending.shift) request.emit(:response, ErrorResponse.new(request, error, @options)) + end + end + + def total_timeout + total = @options.timeout.total_timeout + + return unless total + + @total_timeout ||= @timers.after(total) do + ex = TotalTimeoutError.new(total, "Timed out after #{total} seconds") + ex.set_backtrace(caller) + @parser.close if @parser + on_error(ex) end end end end