lib/httpx/connection.rb in httpx-0.20.5 vs lib/httpx/connection.rb in httpx-0.21.0
- old
+ new
@@ -32,10 +32,11 @@
include Registry
include Loggable
include Callbacks
using URIExtensions
+ using NumericExtensions
require "httpx/connection/http2"
require "httpx/connection/http1"
BUFFER_SIZE = 1 << 14
@@ -231,10 +232,11 @@
if @response_received_at && @keep_alive_timeout &&
Utils.elapsed_time(@response_received_at) > @keep_alive_timeout
# when pushing a request into an existing connection, we have to check whether there
# is the possibility that the connection might have extended the keep alive timeout.
# for such cases, we want to ping for availability before deciding to shovel requests.
+ log(level: 3) { "keep alive timeout expired, pinging connection..." }
@pending << request
parser.ping
transition(:active) if @state == :inactive
return
end
@@ -428,10 +430,12 @@
def send_request_to_parser(request)
@inflight += 1
parser.send(request)
+ set_request_timeouts(request)
+
return unless @state == :inactive
transition(:active)
end
@@ -571,11 +575,11 @@
ex = TotalTimeoutError.new(@total_timeout, "Timed out after #{@total_timeout} seconds")
ex.set_backtrace(error.backtrace)
error = ex
else
# inactive connections do not contribute to the select loop, therefore
- # they should fail due to such errors.
+ # they should not fail due to such errors.
return if @state == :inactive
if @timeout
@timeout -= error.timeout
return unless @timeout <= 0
@@ -589,12 +593,47 @@
end
def handle_error(error)
parser.handle_error(error) if @parser && parser.respond_to?(:handle_error)
while (request = @pending.shift)
- response = ErrorResponse.new(request, error, @options)
+ response = ErrorResponse.new(request, error, request.options)
request.response = response
request.emit(:response, response)
end
+ end
+
+ def set_request_timeouts(request)
+ write_timeout = request.write_timeout
+ request.once(:headers) do
+ @timers.after(write_timeout) { write_timeout_callback(request, write_timeout) }
+ end unless write_timeout.nil? || write_timeout.infinite?
+
+ read_timeout = request.read_timeout
+ request.once(:done) do
+ @timers.after(read_timeout) { read_timeout_callback(request, read_timeout) }
+ end unless read_timeout.nil? || read_timeout.infinite?
+
+ request_timeout = request.request_timeout
+ request.once(:headers) do
+ @timers.after(request_timeout) { read_timeout_callback(request, request_timeout, RequestTimeoutError) }
+ end unless request_timeout.nil? || request_timeout.infinite?
+ end
+
+ def write_timeout_callback(request, write_timeout)
+ return if request.state == :done
+
+ @write_buffer.clear
+ error = WriteTimeoutError.new(request, nil, write_timeout)
+ on_error(error)
+ end
+
+ def read_timeout_callback(request, read_timeout, error_type = ReadTimeoutError)
+ response = request.response
+
+ return if response && response.finished?
+
+ @write_buffer.clear
+ error = error_type.new(request, request.response, read_timeout)
+ on_error(error)
end
end
end