lib/httpx/connection.rb in httpx-0.11.3 vs lib/httpx/connection.rb in httpx-0.12.0

- old
+ new

@@ -168,11 +168,11 @@ return @io.interests if connecting? end # if the write buffer is full, we drain it - return :w if @write_buffer.full? + return :w unless @write_buffer.empty? return @parser.interests if @parser nil end @@ -249,77 +249,125 @@ @parser && parser.exhausted? end def consume catch(:called) do + epiped = false loop do parser.consume - # we exit if there's no more data to process - if @pending.size.zero? && @inflight.zero? + # we exit if there's no more requests to process + # + # this condition takes into account: + # + # * the number of inflight requests + # * the number of pending requests + # * whether the write buffer has bytes (i.e. for close handshake) + if @pending.size.zero? && @inflight.zero? && @write_buffer.empty? log(level: 3) { "NO MORE REQUESTS..." } return end @timeout = @current_timeout read_drained = false write_drained = nil - # dread + # + # tight read loop. + # + # read as much of the socket as possible. + # + # this tight loop reads all the data it can from the socket and pipes it to + # its parser. + # loop do siz = @io.read(@window_size, @read_buffer) + log(level: 3, color: :cyan) { "IO READ: #{siz} bytes..." } unless siz ex = EOFError.new("descriptor closed") ex.set_backtrace(caller) on_error(ex) return end + # socket has been drained. mark and exit the read loop. if siz.zero? read_drained = @read_buffer.empty? + epiped = false break end parser << @read_buffer.to_s + # continue reading if possible. + break if interests == :w && !epiped + + # exit the read loop if connection is preparing to be closed break if @state == :closing || @state == :closed - # for HTTP/2, we just want to write goaway frame - end unless @state == :closing + # exit #consume altogether if all outstanding requests have been dealt with + return if @pending.size.zero? && @inflight.zero? + end unless (interests == :w || @state == :closing) && !epiped - # dwrite + # + # tight write loop. + # + # flush as many bytes as the sockets allow. + # loop do + # buffer has been drainned, mark and exit the write loop. if @write_buffer.empty? # we only mark as drained on the first loop write_drained = write_drained.nil? && @inflight.positive? + break end - siz = @io.write(@write_buffer) + begin + siz = @io.write(@write_buffer) + rescue Errno::EPIPE + # this can happen if we still have bytes in the buffer to send to the server, but + # the server wants to respond immediately with some message, or an error. An example is + # when one's uploading a big file to an unintended endpoint, and the server stops the + # consumption, and responds immediately with an authorization of even method not allowed error. + # at this point, we have to let the connection switch to read-mode. + log(level: 2) { "pipe broken, could not flush buffer..." } + epiped = true + read_drained = false + break + end + log(level: 3, color: :cyan) { "IO WRITE: #{siz} bytes..." } unless siz ex = EOFError.new("descriptor closed") ex.set_backtrace(caller) on_error(ex) return end + # socket closed for writing. mark and exit the write loop. if siz.zero? write_drained = !@write_buffer.empty? break end - break if @state == :closing || @state == :closed + # exit write loop if marked to consume from peer, or is closing. + break if interests == :r || @state == :closing || @state == :closed write_drained = false - end + end unless interests == :r # return if socket is drained - if read_drained && write_drained - log(level: 3) { "WAITING FOR EVENTS..." } - return - end + next unless (interests != :r || read_drained) && + (interests != :w || write_drained) + + # gotta go back to the event loop. It happens when: + # + # * the socket is drained of bytes or it's not the interest of the conn to read; + # * theres nothing more to write, or it's not in the interest of the conn to write; + log(level: 3) { "(#{interests}): WAITING FOR EVENTS..." } + return end end end def send_pending @@ -442,10 +490,10 @@ reset emit(:unreachable) throw(:jump_tick) rescue Errno::ECONNREFUSED, Errno::EADDRNOTAVAIL, - OpenSSL::SSL::SSLError => e + TLSError => e # connect errors, exit gracefully handle_error(e) @state = :closed emit(:close) end