lib/httpx/connection.rb in httpx-0.11.3 vs lib/httpx/connection.rb in httpx-0.12.0
- old
+ new
@@ -168,11 +168,11 @@
return @io.interests if connecting?
end
# if the write buffer is full, we drain it
- return :w if @write_buffer.full?
+ return :w unless @write_buffer.empty?
return @parser.interests if @parser
nil
end
@@ -249,77 +249,125 @@
@parser && parser.exhausted?
end
def consume
catch(:called) do
+ epiped = false
loop do
parser.consume
- # we exit if there's no more data to process
- if @pending.size.zero? && @inflight.zero?
+ # we exit if there's no more requests to process
+ #
+ # this condition takes into account:
+ #
+ # * the number of inflight requests
+ # * the number of pending requests
+ # * whether the write buffer has bytes (i.e. for close handshake)
+ if @pending.size.zero? && @inflight.zero? && @write_buffer.empty?
log(level: 3) { "NO MORE REQUESTS..." }
return
end
@timeout = @current_timeout
read_drained = false
write_drained = nil
- # dread
+ #
+ # tight read loop.
+ #
+ # read as much of the socket as possible.
+ #
+ # this tight loop reads all the data it can from the socket and pipes it to
+ # its parser.
+ #
loop do
siz = @io.read(@window_size, @read_buffer)
+ log(level: 3, color: :cyan) { "IO READ: #{siz} bytes..." }
unless siz
ex = EOFError.new("descriptor closed")
ex.set_backtrace(caller)
on_error(ex)
return
end
+ # socket has been drained. mark and exit the read loop.
if siz.zero?
read_drained = @read_buffer.empty?
+ epiped = false
break
end
parser << @read_buffer.to_s
+ # continue reading if possible.
+ break if interests == :w && !epiped
+
+ # exit the read loop if connection is preparing to be closed
break if @state == :closing || @state == :closed
- # for HTTP/2, we just want to write goaway frame
- end unless @state == :closing
+ # exit #consume altogether if all outstanding requests have been dealt with
+ return if @pending.size.zero? && @inflight.zero?
+ end unless (interests == :w || @state == :closing) && !epiped
- # dwrite
+ #
+ # tight write loop.
+ #
+ # flush as many bytes as the sockets allow.
+ #
loop do
+ # buffer has been drainned, mark and exit the write loop.
if @write_buffer.empty?
# we only mark as drained on the first loop
write_drained = write_drained.nil? && @inflight.positive?
+
break
end
- siz = @io.write(@write_buffer)
+ begin
+ siz = @io.write(@write_buffer)
+ rescue Errno::EPIPE
+ # this can happen if we still have bytes in the buffer to send to the server, but
+ # the server wants to respond immediately with some message, or an error. An example is
+ # when one's uploading a big file to an unintended endpoint, and the server stops the
+ # consumption, and responds immediately with an authorization of even method not allowed error.
+ # at this point, we have to let the connection switch to read-mode.
+ log(level: 2) { "pipe broken, could not flush buffer..." }
+ epiped = true
+ read_drained = false
+ break
+ end
+ log(level: 3, color: :cyan) { "IO WRITE: #{siz} bytes..." }
unless siz
ex = EOFError.new("descriptor closed")
ex.set_backtrace(caller)
on_error(ex)
return
end
+ # socket closed for writing. mark and exit the write loop.
if siz.zero?
write_drained = !@write_buffer.empty?
break
end
- break if @state == :closing || @state == :closed
+ # exit write loop if marked to consume from peer, or is closing.
+ break if interests == :r || @state == :closing || @state == :closed
write_drained = false
- end
+ end unless interests == :r
# return if socket is drained
- if read_drained && write_drained
- log(level: 3) { "WAITING FOR EVENTS..." }
- return
- end
+ next unless (interests != :r || read_drained) &&
+ (interests != :w || write_drained)
+
+ # gotta go back to the event loop. It happens when:
+ #
+ # * the socket is drained of bytes or it's not the interest of the conn to read;
+ # * theres nothing more to write, or it's not in the interest of the conn to write;
+ log(level: 3) { "(#{interests}): WAITING FOR EVENTS..." }
+ return
end
end
end
def send_pending
@@ -442,10 +490,10 @@
reset
emit(:unreachable)
throw(:jump_tick)
rescue Errno::ECONNREFUSED,
Errno::EADDRNOTAVAIL,
- OpenSSL::SSL::SSLError => e
+ TLSError => e
# connect errors, exit gracefully
handle_error(e)
@state = :closed
emit(:close)
end