lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-6.8.1 vs lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-6.8.2
- old
+ new
@@ -1,5 +1,9 @@
+# Licensed to Elasticsearch B.V under one or more agreements.
+# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+# See the LICENSE file in the project root for more information
+
module Elasticsearch
module Transport
module Transport
# @abstract Module with common functionality for transport implementations.
@@ -14,11 +18,11 @@
SANITIZED_PASSWORD = '*' * (rand(14)+1)
attr_reader :hosts, :options, :connections, :counter, :last_request_at, :protocol
attr_accessor :serializer, :sniffer, :logger, :tracer,
:reload_connections, :reload_after,
- :resurrect_after, :max_retries
+ :resurrect_after
# Creates a new transport object
#
# @param arguments [Hash] Settings and options for the transport
# @param block [Proc] Lambda or Proc which can be evaluated in the context of the "session" object
@@ -50,11 +54,10 @@
@counter_mtx = Mutex.new
@last_request_at = Time.now
@reload_connections = options[:reload_connections]
@reload_after = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
@resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
- @max_retries = options[:retry_on_failure].is_a?(Integer) ? options[:retry_on_failure] : DEFAULT_MAX_RETRIES
@retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
end
# Returns a connection from the connection pool by delegating to {Connections::Collection#get_connection}.
#
@@ -242,15 +245,22 @@
# @return [Response]
# @raise [NoMethodError] If no block is passed
# @raise [ServerError] If request failed on server
# @raise [Error] If no connection is available
#
- def perform_request(method, path, params={}, body=nil, headers=nil, &block)
+ def perform_request(method, path, params={}, body=nil, headers=nil, opts={}, &block)
raise NoMethodError, "Implement this method in your transport class" unless block_given?
start = Time.now if logger || tracer
tries = 0
+ reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure])
+ max_retries = if opts.key?(:retry_on_failure)
+ opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure]
+ elsif options.key?(:retry_on_failure)
+ options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure]
+ end
+
params = params.clone
ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i }
begin
@@ -269,13 +279,13 @@
# Raise an exception so we can catch it for `retry_on_status`
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
rescue Elasticsearch::Transport::Transport::ServerError => e
- if @retry_on_status.include?(response.status)
+ if response && @retry_on_status.include?(response.status)
logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
- if tries <= max_retries
+ if tries <= (max_retries || DEFAULT_MAX_RETRIES)
retry
else
logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger
raise e
end
@@ -286,15 +296,15 @@
rescue *host_unreachable_exceptions => e
logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger
connection.dead!
- if @options[:reload_on_failure] and tries < connections.all.size
+ if reload_on_failure and tries < connections.all.size
logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger
reload_connections! and retry
end
- if @options[:retry_on_failure]
+ if max_retries
logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger
if tries <= max_retries
retry
else
logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger