lib/httpx/connection.rb in httpx-0.7.0 vs lib/httpx/connection.rb in httpx-0.8.0
- old
+ new
@@ -65,10 +65,14 @@
@io = IO.registry(@type).new(@origin, nil, @options)
parser
else
transition(:idle)
end
+
+ @inflight = 0
+ @keep_alive_timeout = options.timeout.keep_alive_timeout
+ @keep_alive_timer = nil
end
# this is a semi-private method, to be used by the resolver
# to initiate the io object.
def addresses=(addrs)
@@ -82,10 +86,12 @@
def match?(uri, options)
return false if @state == :closing || @state == :closed
return false if exhausted?
+ return false if @keep_alive_timer && @keep_alive_timer.fires_in.negative?
+
(
(
@origins.include?(uri.origin) &&
# if there is more than one origin to match, it means that this connection
# was the result of coalescing. To prevent blind trust in the case where the
@@ -99,10 +105,12 @@
def mergeable?(connection)
return false if @state == :closing || @state == :closed || !@io
return false if exhausted?
+ return false if @keep_alive_timer && @keep_alive_timer.fires_in.negative?
+
!(@io.addresses & connection.addresses).empty? && @options == connection.options
end
# coalescable connections need to be mergeable!
# but internally, #mergeable? is called before #coalescable?
@@ -138,11 +146,14 @@
end
end
def purge_pending
pendings = []
- pendings << @parser.pending if @parser
+ if @parser
+ @inflight -= @parser.pending.size
+ pendings << @parser.pending
+ end
pendings << @pending
pendings.each do |pending|
pending.reject! do |request|
yield request
end
@@ -166,26 +177,49 @@
def inflight?
@parser && !@parser.empty? && !@write_buffer.empty?
end
def interests
- return :w if @state == :idle
+ # connecting
+ if connecting?
+ connect
- :rw
+ return @io.interests if connecting?
+ end
+
+ # if the write buffer is full, we drain it
+ return :w if @write_buffer.full?
+
+ return @parser.interests if @parser
+
+ nil
end
def to_io
+ @io.to_io
+ end
+
+ def call
case @state
- when :idle
- transition(:open)
+ when :closed
+ return
+ when :closing
+ consume
+ transition(:closed)
+ emit(:close)
+ when :open
+ consume
end
- @io.to_io
+ nil
end
def close
@parser.close if @parser
- transition(:closing)
+ return unless @keep_alive_timer
+
+ @keep_alive_timer.cancel
+ remove_instance_variable(:@keep_alive_timer)
end
def reset
transition(:closing)
transition(:closed)
@@ -193,88 +227,116 @@
end
def send(request)
if @parser && !@write_buffer.full?
request.headers["alt-used"] = @origin.authority if match_altsvcs?(request.uri)
+ @inflight += 1
+ @keep_alive_timer.pause if @keep_alive_timer
parser.send(request)
else
@pending << request
end
end
- def call
- case @state
- when :closed
- return
- when :closing
- dwrite
- transition(:closed)
- emit(:close)
- when :open
- consume
- end
- nil
- end
-
def timeout
return @timeout if defined?(@timeout)
return @options.timeout.connect_timeout if @state == :idle
@options.timeout.operation_timeout
end
private
+ def connect
+ transition(:open)
+ end
+
def exhausted?
@parser && parser.exhausted?
end
def consume
catch(:called) do
- dread
- dwrite
- parser.consume
- end
- end
+ loop do
+ parser.consume
- def dread(wsize = @window_size)
- loop do
- siz = @io.read(wsize, @read_buffer)
- unless siz
- ex = EOFError.new("descriptor closed")
- ex.set_backtrace(caller)
- on_error(ex)
- return
- end
- return if siz.zero?
+ # we exit if there's no more data to process
+ if @pending.size.zero? && @inflight.zero?
+ log(level: 3) { "NO MORE REQUESTS..." }
+ return
+ end
- log { "READ: #{siz} bytes..." }
- parser << @read_buffer.to_s
- return if @state == :closing || @state == :closed
- end
- end
+ @timeout = @current_timeout
- def dwrite
- loop do
- return if @write_buffer.empty?
+ read_drained = false
+ write_drained = nil
- siz = @io.write(@write_buffer)
- unless siz
- ex = EOFError.new("descriptor closed")
- ex.set_backtrace(caller)
- on_error(ex)
- return
+ # dread
+ loop do
+ siz = @io.read(@window_size, @read_buffer)
+ unless siz
+ ex = EOFError.new("descriptor closed")
+ ex.set_backtrace(caller)
+ on_error(ex)
+ return
+ end
+
+ log { "READ: #{siz} bytes..." }
+
+ if siz.zero?
+ read_drained = @read_buffer.empty?
+ break
+ end
+
+ parser << @read_buffer.to_s
+
+ break if @state == :closing || @state == :closed
+
+ # for HTTP/2, we just want to write goaway frame
+ end unless @state == :closing
+
+ # dwrite
+ loop do
+ if @write_buffer.empty?
+ # we only mark as drained on the first loop
+ write_drained = write_drained.nil? && @inflight.positive?
+ break
+ end
+
+ siz = @io.write(@write_buffer)
+ unless siz
+ ex = EOFError.new("descriptor closed")
+ ex.set_backtrace(caller)
+ on_error(ex)
+ return
+ end
+ log { "WRITE: #{siz} bytes..." }
+
+ if siz.zero?
+ write_drained = !@write_buffer.empty?
+ break
+ end
+
+ break if @state == :closing || @state == :closed
+
+ write_drained = false
+ end
+
+ # return if socket is drained
+ if read_drained && write_drained
+ log(level: 3) { "WAITING FOR EVENTS..." }
+ return
+ end
end
- log { "WRITE: #{siz} bytes..." }
- return if siz.zero?
- return if @state == :closing || @state == :closed
end
end
def send_pending
while !@write_buffer.full? && (request = @pending.shift)
+ @inflight += 1
+ @keep_alive_timer.pause if @keep_alive_timer
parser.send(request)
end
end
def parser
@@ -290,10 +352,11 @@
def set_parser_callbacks(parser)
parser.on(:response) do |request, response|
AltSvc.emit(request, response) do |alt_origin, origin, alt_params|
emit(:altsvc, alt_origin, origin, alt_params)
end
+ handle_response
request.emit(:response, response)
end
parser.on(:altsvc) do |alt_origin, origin, alt_params|
emit(:altsvc, alt_origin, origin, alt_params)
end
@@ -305,18 +368,25 @@
emit(:exhausted)
end
parser.on(:origin) do |origin|
@origins << origin
end
- parser.on(:close) do
+ parser.on(:close) do |force|
transition(:closing)
+ if force
+ transition(:closed)
+ emit(:close)
+ end
end
parser.on(:reset) do
- transition(:closing)
- unless parser.empty?
+ if parser.empty?
+ reset
+ else
+ transition(:closing)
transition(:closed)
emit(:reset)
+ @parser.reset if @parser
transition(:idle)
transition(:open)
end
end
parser.on(:timeout) do |tout|
@@ -333,19 +403,24 @@
end
end
def transition(nextstate)
case nextstate
+ when :idle
+ @timeout = @current_timeout = @options.timeout.connect_timeout
+
when :open
return if @state == :closed
total_timeout
@io.connect
return unless @io.connected?
send_pending
+
+ @timeout = @current_timeout = @options.timeout.operation_timeout
emit(:open)
when :closing
return unless @state == :open
when :closed
return unless @state == :closing
@@ -356,10 +431,15 @@
remove_instance_variable(:@total_timeout)
end
@io.close
@read_buffer.clear
+ if @keep_alive_timer
+ @keep_alive_timer.cancel
+ remove_instance_variable(:@keep_alive_timer)
+ end
+
remove_instance_variable(:@timeout) if defined?(@timeout)
when :already_open
nextstate = :open
send_pending
end
@@ -377,25 +457,47 @@
handle_error(e)
@state = :closed
emit(:close)
end
- def on_error(ex)
- handle_error(ex)
- reset
+ def handle_response
+ @inflight -= 1
+ return unless @inflight.zero?
+
+ if @keep_alive_timer
+ @keep_alive_timer.resume
+ @keep_alive_timer.reset
+ else
+ @keep_alive_timer = @timers.after(@keep_alive_timeout) do
+ unless @inflight.zero?
+ log { "(#{object_id})) keep alive timeout expired, closing..." }
+ reset
+ end
+ end
+ end
end
- def handle_error(error)
+ def on_error(error)
if error.instance_of?(TimeoutError)
if @timeout
@timeout -= error.timeout
return unless @timeout <= 0
end
- error = error.to_connection_error if connecting?
+ if @total_timeout && @total_timeout.fires_in.negative?
+ ex = TotalTimeoutError.new(@total_timeout.interval, "Timed out after #{@total_timeout.interval} seconds")
+ ex.set_backtrace(error.backtrace)
+ error = ex
+ elsif connecting?
+ error = error.to_connection_error
+ end
end
+ handle_error(error)
+ reset
+ end
+ def handle_error(error)
parser.handle_error(error) if @parser && parser.respond_to?(:handle_error)
while (request = @pending.shift)
request.emit(:response, ErrorResponse.new(request, error, @options))
end
end
@@ -406,11 +508,11 @@
return unless total
@total_timeout ||= @timers.after(total) do
ex = TotalTimeoutError.new(total, "Timed out after #{total} seconds")
ex.set_backtrace(caller)
- @parser.close if @parser
on_error(ex)
+ @parser.close if @parser
end
end
end
end