lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-6.8.1 vs lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-6.8.2

- old
+ new

@@ -1,5 +1,9 @@ +# Licensed to Elasticsearch B.V under one or more agreements. +# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +# See the LICENSE file in the project root for more information + module Elasticsearch module Transport module Transport # @abstract Module with common functionality for transport implementations. @@ -14,11 +18,11 @@ SANITIZED_PASSWORD = '*' * (rand(14)+1) attr_reader :hosts, :options, :connections, :counter, :last_request_at, :protocol attr_accessor :serializer, :sniffer, :logger, :tracer, :reload_connections, :reload_after, - :resurrect_after, :max_retries + :resurrect_after # Creates a new transport object # # @param arguments [Hash] Settings and options for the transport # @param block [Proc] Lambda or Proc which can be evaluated in the context of the "session" object @@ -50,11 +54,10 @@ @counter_mtx = Mutex.new @last_request_at = Time.now @reload_connections = options[:reload_connections] @reload_after = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER - @max_retries = options[:retry_on_failure].is_a?(Integer) ? options[:retry_on_failure] : DEFAULT_MAX_RETRIES @retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i } end # Returns a connection from the connection pool by delegating to {Connections::Collection#get_connection}. # @@ -242,15 +245,22 @@ # @return [Response] # @raise [NoMethodError] If no block is passed # @raise [ServerError] If request failed on server # @raise [Error] If no connection is available # - def perform_request(method, path, params={}, body=nil, headers=nil, &block) + def perform_request(method, path, params={}, body=nil, headers=nil, opts={}, &block) raise NoMethodError, "Implement this method in your transport class" unless block_given? start = Time.now if logger || tracer tries = 0 + reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure]) + max_retries = if opts.key?(:retry_on_failure) + opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure] + elsif options.key?(:retry_on_failure) + options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure] + end + params = params.clone ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i } begin @@ -269,13 +279,13 @@ # Raise an exception so we can catch it for `retry_on_status` __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i) rescue Elasticsearch::Transport::Transport::ServerError => e - if @retry_on_status.include?(response.status) + if response && @retry_on_status.include?(response.status) logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger - if tries <= max_retries + if tries <= (max_retries || DEFAULT_MAX_RETRIES) retry else logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger raise e end @@ -286,15 +296,15 @@ rescue *host_unreachable_exceptions => e logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger connection.dead! - if @options[:reload_on_failure] and tries < connections.all.size + if reload_on_failure and tries < connections.all.size logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger reload_connections! and retry end - if @options[:retry_on_failure] + if max_retries logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger if tries <= max_retries retry else logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger