lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-6.8.3 vs lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-7.0.0.pre

- old
+ new

@@ -1,16 +1,31 @@ -# Licensed to Elasticsearch B.V under one or more agreements. -# Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -# See the LICENSE file in the project root for more information +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. module Elasticsearch module Transport module Transport # @abstract Module with common functionality for transport implementations. # module Base + include Loggable + DEFAULT_PORT = 9200 DEFAULT_PROTOCOL = 'http' DEFAULT_RELOAD_AFTER = 10_000 # Requests DEFAULT_RESURRECT_AFTER = 60 # Seconds DEFAULT_MAX_RETRIES = 3 # Requests @@ -18,11 +33,11 @@ SANITIZED_PASSWORD = '*' * (rand(14)+1) attr_reader :hosts, :options, :connections, :counter, :last_request_at, :protocol attr_accessor :serializer, :sniffer, :logger, :tracer, :reload_connections, :reload_after, - :resurrect_after + :resurrect_after, :max_retries # Creates a new transport object # # @param arguments [Hash] Settings and options for the transport # @param block [Proc] Lambda or Proc which can be evaluated in the context of the "session" object @@ -34,11 +49,11 @@ # def initialize(arguments={}, &block) @state_mutex = Mutex.new @hosts = arguments[:hosts] || [] - @options = arguments[:options] ? arguments[:options].dup : {} + @options = arguments[:options] || {} @options[:http] ||= {} @options[:retry_on_status] ||= [] @block = block @connections = __build_connections @@ -54,10 +69,11 @@ @counter_mtx = Mutex.new @last_request_at = Time.now @reload_connections = options[:reload_connections] @reload_after = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER + @max_retries = options[:retry_on_failure].is_a?(Integer) ? options[:retry_on_failure] : DEFAULT_MAX_RETRIES @retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i } end # Returns a connection from the connection pool by delegating to {Connections::Collection#get_connection}. # @@ -82,11 +98,11 @@ def reload_connections! hosts = sniffer.hosts __rebuild_connections :hosts => hosts, :options => options self rescue SnifferTimeoutError - logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger + log_error "[SnifferTimeoutError] Timeout when reloading connections." self end # Tries to "resurrect" all eligible dead connections # @@ -129,19 +145,19 @@ # @api private # def __build_connections Connections::Collection.new \ :connections => hosts.map { |host| - host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL - host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT - if (options[:user] || options[:http][:user]) && !host[:user] - host[:user] ||= options[:user] || options[:http][:user] - host[:password] ||= options[:password] || options[:http][:password] - end + host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL + host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT + if (options[:user] || options[:http][:user]) && !host[:user] + host[:user] ||= options[:user] || options[:http][:user] + host[:password] ||= options[:password] || options[:http][:password] + end - __build_connection(host, (options[:transport_options] || {}), @block) - }, + __build_connection(host, (options[:transport_options] || {}), @block) + }, :selector_class => options[:selector_class], :selector => options[:selector] end # @abstract Build and return a connection. @@ -165,33 +181,27 @@ # Log request and response information # # @api private # - def __log(method, path, params, body, url, response, json, took, duration) - sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') - logger.info "#{method.to_s.upcase} #{sanitized_url} " + - "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" - logger.debug "> #{__convert_to_json(body)}" if body - logger.debug "< #{response.body}" + def __log_response(method, path, params, body, url, response, json, took, duration) + if logger + sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') + log_info "#{method.to_s.upcase} #{sanitized_url} " + + "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" + log_debug "> #{__convert_to_json(body)}" if body + log_debug "< #{response.body}" + end end - # Log failed request - # - # @api private - # - def __log_failed(response) - logger.fatal "[#{response.status}] #{response.body}" - end - # Trace the request in the `curl` format # # @api private # def __trace(method, path, params, headers, body, url, response, json, took, duration) trace_url = "http://localhost:9200/#{path}?pretty" + - ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) + ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' trace_command = "curl -X #{method.to_s.upcase}" trace_command += " -H '#{headers.inject('') { |memo,item| memo << item[0] + ': ' + item[1] }}'" if headers && !headers.empty? trace_command += " '#{trace_url}'#{trace_body}\n" tracer.info trace_command @@ -245,22 +255,15 @@ # @return [Response] # @raise [NoMethodError] If no block is passed # @raise [ServerError] If request failed on server # @raise [Error] If no connection is available # - def perform_request(method, path, params={}, body=nil, headers=nil, opts={}, &block) + def perform_request(method, path, params={}, body=nil, headers=nil, &block) raise NoMethodError, "Implement this method in your transport class" unless block_given? - start = Time.now if logger || tracer + start = Time.now tries = 0 - reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure]) - max_retries = if opts.key?(:retry_on_failure) - opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure] - elsif options.key?(:retry_on_failure) - options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure] - end - params = params.clone ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i } begin @@ -279,66 +282,71 @@ # Raise an exception so we can catch it for `retry_on_status` __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i) rescue Elasticsearch::Transport::Transport::ServerError => e - if response && @retry_on_status.include?(response.status) - logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger - if tries <= (max_retries || DEFAULT_MAX_RETRIES) + if @retry_on_status.include?(response.status) + log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}" + if tries <= max_retries retry else - logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger + log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" raise e end else raise e end rescue *host_unreachable_exceptions => e - logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger + log_error "[#{e.class}] #{e.message} #{connection.host.inspect}" connection.dead! - if reload_on_failure and tries < connections.all.size - logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger + if @options[:reload_on_failure] and tries < connections.all.size + log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" reload_connections! and retry end - if max_retries - logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger + if @options[:retry_on_failure] + log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if tries <= max_retries retry else - logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger + log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" raise e end else raise e end rescue Exception => e - logger.fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" if logger + log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" raise e end #/begin - duration = Time.now-start if logger || tracer + duration = Time.now - start if response.status.to_i >= 300 - __log method, path, params, body, url, response, nil, 'N/A', duration if logger + __log_response method, path, params, body, url, response, nil, 'N/A', duration __trace method, path, params, headers, body, url, response, nil, 'N/A', duration if tracer # Log the failure only when `ignore` doesn't match the response status - __log_failed response if logger && !ignore.include?(response.status.to_i) + unless ignore.include?(response.status.to_i) + log_fatal "[#{response.status}] #{response.body}" + end __raise_transport_error response unless ignore.include?(response.status.to_i) end json = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/ - took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer + took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' - __log method, path, params, body, url, response, json, took, duration if logger && !ignore.include?(response.status.to_i) + unless ignore.include?(response.status.to_i) + __log_response method, path, params, body, url, response, json, took, duration + end + __trace method, path, params, headers, body, url, response, json, took, duration if tracer Response.new response.status, json || response.body, response.headers ensure @last_request_at = Time.now @@ -353,6 +361,6 @@ [Errno::ECONNREFUSED] end end end end -end +end \ No newline at end of file