lib/httpx/connection.rb in httpx-0.7.0 vs lib/httpx/connection.rb in httpx-0.8.0

- old
+ new

@@ -65,10 +65,14 @@ @io = IO.registry(@type).new(@origin, nil, @options) parser else transition(:idle) end + + @inflight = 0 + @keep_alive_timeout = options.timeout.keep_alive_timeout + @keep_alive_timer = nil end # this is a semi-private method, to be used by the resolver # to initiate the io object. def addresses=(addrs) @@ -82,10 +86,12 @@ def match?(uri, options) return false if @state == :closing || @state == :closed return false if exhausted? + return false if @keep_alive_timer && @keep_alive_timer.fires_in.negative? + ( ( @origins.include?(uri.origin) && # if there is more than one origin to match, it means that this connection # was the result of coalescing. To prevent blind trust in the case where the @@ -99,10 +105,12 @@ def mergeable?(connection) return false if @state == :closing || @state == :closed || !@io return false if exhausted? + return false if @keep_alive_timer && @keep_alive_timer.fires_in.negative? + !(@io.addresses & connection.addresses).empty? && @options == connection.options end # coalescable connections need to be mergeable! # but internally, #mergeable? is called before #coalescable? @@ -138,11 +146,14 @@ end end def purge_pending pendings = [] - pendings << @parser.pending if @parser + if @parser + @inflight -= @parser.pending.size + pendings << @parser.pending + end pendings << @pending pendings.each do |pending| pending.reject! do |request| yield request end @@ -166,26 +177,49 @@ def inflight? @parser && !@parser.empty? && !@write_buffer.empty? end def interests - return :w if @state == :idle + # connecting + if connecting? + connect - :rw + return @io.interests if connecting? + end + + # if the write buffer is full, we drain it + return :w if @write_buffer.full? + + return @parser.interests if @parser + + nil end def to_io + @io.to_io + end + + def call case @state - when :idle - transition(:open) + when :closed + return + when :closing + consume + transition(:closed) + emit(:close) + when :open + consume end - @io.to_io + nil end def close @parser.close if @parser - transition(:closing) + return unless @keep_alive_timer + + @keep_alive_timer.cancel + remove_instance_variable(:@keep_alive_timer) end def reset transition(:closing) transition(:closed) @@ -193,88 +227,116 @@ end def send(request) if @parser && !@write_buffer.full? request.headers["alt-used"] = @origin.authority if match_altsvcs?(request.uri) + @inflight += 1 + @keep_alive_timer.pause if @keep_alive_timer parser.send(request) else @pending << request end end - def call - case @state - when :closed - return - when :closing - dwrite - transition(:closed) - emit(:close) - when :open - consume - end - nil - end - def timeout return @timeout if defined?(@timeout) return @options.timeout.connect_timeout if @state == :idle @options.timeout.operation_timeout end private + def connect + transition(:open) + end + def exhausted? @parser && parser.exhausted? end def consume catch(:called) do - dread - dwrite - parser.consume - end - end + loop do + parser.consume - def dread(wsize = @window_size) - loop do - siz = @io.read(wsize, @read_buffer) - unless siz - ex = EOFError.new("descriptor closed") - ex.set_backtrace(caller) - on_error(ex) - return - end - return if siz.zero? + # we exit if there's no more data to process + if @pending.size.zero? && @inflight.zero? + log(level: 3) { "NO MORE REQUESTS..." } + return + end - log { "READ: #{siz} bytes..." } - parser << @read_buffer.to_s - return if @state == :closing || @state == :closed - end - end + @timeout = @current_timeout - def dwrite - loop do - return if @write_buffer.empty? + read_drained = false + write_drained = nil - siz = @io.write(@write_buffer) - unless siz - ex = EOFError.new("descriptor closed") - ex.set_backtrace(caller) - on_error(ex) - return + # dread + loop do + siz = @io.read(@window_size, @read_buffer) + unless siz + ex = EOFError.new("descriptor closed") + ex.set_backtrace(caller) + on_error(ex) + return + end + + log { "READ: #{siz} bytes..." } + + if siz.zero? + read_drained = @read_buffer.empty? + break + end + + parser << @read_buffer.to_s + + break if @state == :closing || @state == :closed + + # for HTTP/2, we just want to write goaway frame + end unless @state == :closing + + # dwrite + loop do + if @write_buffer.empty? + # we only mark as drained on the first loop + write_drained = write_drained.nil? && @inflight.positive? + break + end + + siz = @io.write(@write_buffer) + unless siz + ex = EOFError.new("descriptor closed") + ex.set_backtrace(caller) + on_error(ex) + return + end + log { "WRITE: #{siz} bytes..." } + + if siz.zero? + write_drained = !@write_buffer.empty? + break + end + + break if @state == :closing || @state == :closed + + write_drained = false + end + + # return if socket is drained + if read_drained && write_drained + log(level: 3) { "WAITING FOR EVENTS..." } + return + end end - log { "WRITE: #{siz} bytes..." } - return if siz.zero? - return if @state == :closing || @state == :closed end end def send_pending while !@write_buffer.full? && (request = @pending.shift) + @inflight += 1 + @keep_alive_timer.pause if @keep_alive_timer parser.send(request) end end def parser @@ -290,10 +352,11 @@ def set_parser_callbacks(parser) parser.on(:response) do |request, response| AltSvc.emit(request, response) do |alt_origin, origin, alt_params| emit(:altsvc, alt_origin, origin, alt_params) end + handle_response request.emit(:response, response) end parser.on(:altsvc) do |alt_origin, origin, alt_params| emit(:altsvc, alt_origin, origin, alt_params) end @@ -305,18 +368,25 @@ emit(:exhausted) end parser.on(:origin) do |origin| @origins << origin end - parser.on(:close) do + parser.on(:close) do |force| transition(:closing) + if force + transition(:closed) + emit(:close) + end end parser.on(:reset) do - transition(:closing) - unless parser.empty? + if parser.empty? + reset + else + transition(:closing) transition(:closed) emit(:reset) + @parser.reset if @parser transition(:idle) transition(:open) end end parser.on(:timeout) do |tout| @@ -333,19 +403,24 @@ end end def transition(nextstate) case nextstate + when :idle + @timeout = @current_timeout = @options.timeout.connect_timeout + when :open return if @state == :closed total_timeout @io.connect return unless @io.connected? send_pending + + @timeout = @current_timeout = @options.timeout.operation_timeout emit(:open) when :closing return unless @state == :open when :closed return unless @state == :closing @@ -356,10 +431,15 @@ remove_instance_variable(:@total_timeout) end @io.close @read_buffer.clear + if @keep_alive_timer + @keep_alive_timer.cancel + remove_instance_variable(:@keep_alive_timer) + end + remove_instance_variable(:@timeout) if defined?(@timeout) when :already_open nextstate = :open send_pending end @@ -377,25 +457,47 @@ handle_error(e) @state = :closed emit(:close) end - def on_error(ex) - handle_error(ex) - reset + def handle_response + @inflight -= 1 + return unless @inflight.zero? + + if @keep_alive_timer + @keep_alive_timer.resume + @keep_alive_timer.reset + else + @keep_alive_timer = @timers.after(@keep_alive_timeout) do + unless @inflight.zero? + log { "(#{object_id})) keep alive timeout expired, closing..." } + reset + end + end + end end - def handle_error(error) + def on_error(error) if error.instance_of?(TimeoutError) if @timeout @timeout -= error.timeout return unless @timeout <= 0 end - error = error.to_connection_error if connecting? + if @total_timeout && @total_timeout.fires_in.negative? + ex = TotalTimeoutError.new(@total_timeout.interval, "Timed out after #{@total_timeout.interval} seconds") + ex.set_backtrace(error.backtrace) + error = ex + elsif connecting? + error = error.to_connection_error + end end + handle_error(error) + reset + end + def handle_error(error) parser.handle_error(error) if @parser && parser.respond_to?(:handle_error) while (request = @pending.shift) request.emit(:response, ErrorResponse.new(request, error, @options)) end end @@ -406,11 +508,11 @@ return unless total @total_timeout ||= @timers.after(total) do ex = TotalTimeoutError.new(total, "Timed out after #{total} seconds") ex.set_backtrace(caller) - @parser.close if @parser on_error(ex) + @parser.close if @parser end end end end