lib/httpx/connection.rb in httpx-0.20.5 vs lib/httpx/connection.rb in httpx-0.21.0

- old
+ new

@@ -32,10 +32,11 @@ include Registry include Loggable include Callbacks using URIExtensions + using NumericExtensions require "httpx/connection/http2" require "httpx/connection/http1" BUFFER_SIZE = 1 << 14 @@ -231,10 +232,11 @@ if @response_received_at && @keep_alive_timeout && Utils.elapsed_time(@response_received_at) > @keep_alive_timeout # when pushing a request into an existing connection, we have to check whether there # is the possibility that the connection might have extended the keep alive timeout. # for such cases, we want to ping for availability before deciding to shovel requests. + log(level: 3) { "keep alive timeout expired, pinging connection..." } @pending << request parser.ping transition(:active) if @state == :inactive return end @@ -428,10 +430,12 @@ def send_request_to_parser(request) @inflight += 1 parser.send(request) + set_request_timeouts(request) + return unless @state == :inactive transition(:active) end @@ -571,11 +575,11 @@ ex = TotalTimeoutError.new(@total_timeout, "Timed out after #{@total_timeout} seconds") ex.set_backtrace(error.backtrace) error = ex else # inactive connections do not contribute to the select loop, therefore - # they should fail due to such errors. + # they should not fail due to such errors. return if @state == :inactive if @timeout @timeout -= error.timeout return unless @timeout <= 0 @@ -589,12 +593,47 @@ end def handle_error(error) parser.handle_error(error) if @parser && parser.respond_to?(:handle_error) while (request = @pending.shift) - response = ErrorResponse.new(request, error, @options) + response = ErrorResponse.new(request, error, request.options) request.response = response request.emit(:response, response) end + end + + def set_request_timeouts(request) + write_timeout = request.write_timeout + request.once(:headers) do + @timers.after(write_timeout) { write_timeout_callback(request, write_timeout) } + end unless write_timeout.nil? || write_timeout.infinite? + + read_timeout = request.read_timeout + request.once(:done) do + @timers.after(read_timeout) { read_timeout_callback(request, read_timeout) } + end unless read_timeout.nil? || read_timeout.infinite? + + request_timeout = request.request_timeout + request.once(:headers) do + @timers.after(request_timeout) { read_timeout_callback(request, request_timeout, RequestTimeoutError) } + end unless request_timeout.nil? || request_timeout.infinite? + end + + def write_timeout_callback(request, write_timeout) + return if request.state == :done + + @write_buffer.clear + error = WriteTimeoutError.new(request, nil, write_timeout) + on_error(error) + end + + def read_timeout_callback(request, read_timeout, error_type = ReadTimeoutError) + response = request.response + + return if response && response.finished? + + @write_buffer.clear + error = error_type.new(request, request.response, read_timeout) + on_error(error) end end end