lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-6.8.3 vs lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-7.0.0.pre
- old
+ new
@@ -1,16 +1,31 @@
-# Licensed to Elasticsearch B.V under one or more agreements.
-# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
-# See the LICENSE file in the project root for more information
+# Licensed to Elasticsearch B.V. under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Elasticsearch B.V. licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
module Elasticsearch
module Transport
module Transport
# @abstract Module with common functionality for transport implementations.
#
module Base
+ include Loggable
+
DEFAULT_PORT = 9200
DEFAULT_PROTOCOL = 'http'
DEFAULT_RELOAD_AFTER = 10_000 # Requests
DEFAULT_RESURRECT_AFTER = 60 # Seconds
DEFAULT_MAX_RETRIES = 3 # Requests
@@ -18,11 +33,11 @@
SANITIZED_PASSWORD = '*' * (rand(14)+1)
attr_reader :hosts, :options, :connections, :counter, :last_request_at, :protocol
attr_accessor :serializer, :sniffer, :logger, :tracer,
:reload_connections, :reload_after,
- :resurrect_after
+ :resurrect_after, :max_retries
# Creates a new transport object
#
# @param arguments [Hash] Settings and options for the transport
# @param block [Proc] Lambda or Proc which can be evaluated in the context of the "session" object
@@ -34,11 +49,11 @@
#
def initialize(arguments={}, &block)
@state_mutex = Mutex.new
@hosts = arguments[:hosts] || []
- @options = arguments[:options] ? arguments[:options].dup : {}
+ @options = arguments[:options] || {}
@options[:http] ||= {}
@options[:retry_on_status] ||= []
@block = block
@connections = __build_connections
@@ -54,10 +69,11 @@
@counter_mtx = Mutex.new
@last_request_at = Time.now
@reload_connections = options[:reload_connections]
@reload_after = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
@resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
+ @max_retries = options[:retry_on_failure].is_a?(Integer) ? options[:retry_on_failure] : DEFAULT_MAX_RETRIES
@retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
end
# Returns a connection from the connection pool by delegating to {Connections::Collection#get_connection}.
#
@@ -82,11 +98,11 @@
def reload_connections!
hosts = sniffer.hosts
__rebuild_connections :hosts => hosts, :options => options
self
rescue SnifferTimeoutError
- logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger
+ log_error "[SnifferTimeoutError] Timeout when reloading connections."
self
end
# Tries to "resurrect" all eligible dead connections
#
@@ -129,19 +145,19 @@
# @api private
#
def __build_connections
Connections::Collection.new \
:connections => hosts.map { |host|
- host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
- host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT
- if (options[:user] || options[:http][:user]) && !host[:user]
- host[:user] ||= options[:user] || options[:http][:user]
- host[:password] ||= options[:password] || options[:http][:password]
- end
+ host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
+ host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT
+ if (options[:user] || options[:http][:user]) && !host[:user]
+ host[:user] ||= options[:user] || options[:http][:user]
+ host[:password] ||= options[:password] || options[:http][:password]
+ end
- __build_connection(host, (options[:transport_options] || {}), @block)
- },
+ __build_connection(host, (options[:transport_options] || {}), @block)
+ },
:selector_class => options[:selector_class],
:selector => options[:selector]
end
# @abstract Build and return a connection.
@@ -165,33 +181,27 @@
# Log request and response information
#
# @api private
#
- def __log(method, path, params, body, url, response, json, took, duration)
- sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@')
- logger.info "#{method.to_s.upcase} #{sanitized_url} " +
- "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
- logger.debug "> #{__convert_to_json(body)}" if body
- logger.debug "< #{response.body}"
+ def __log_response(method, path, params, body, url, response, json, took, duration)
+ if logger
+ sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@')
+ log_info "#{method.to_s.upcase} #{sanitized_url} " +
+ "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
+ log_debug "> #{__convert_to_json(body)}" if body
+ log_debug "< #{response.body}"
+ end
end
- # Log failed request
- #
- # @api private
- #
- def __log_failed(response)
- logger.fatal "[#{response.status}] #{response.body}"
- end
-
# Trace the request in the `curl` format
#
# @api private
#
def __trace(method, path, params, headers, body, url, response, json, took, duration)
trace_url = "http://localhost:9200/#{path}?pretty" +
- ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
+ ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
trace_command = "curl -X #{method.to_s.upcase}"
trace_command += " -H '#{headers.inject('') { |memo,item| memo << item[0] + ': ' + item[1] }}'" if headers && !headers.empty?
trace_command += " '#{trace_url}'#{trace_body}\n"
tracer.info trace_command
@@ -245,22 +255,15 @@
# @return [Response]
# @raise [NoMethodError] If no block is passed
# @raise [ServerError] If request failed on server
# @raise [Error] If no connection is available
#
- def perform_request(method, path, params={}, body=nil, headers=nil, opts={}, &block)
+ def perform_request(method, path, params={}, body=nil, headers=nil, &block)
raise NoMethodError, "Implement this method in your transport class" unless block_given?
- start = Time.now if logger || tracer
+ start = Time.now
tries = 0
- reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure])
- max_retries = if opts.key?(:retry_on_failure)
- opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure]
- elsif options.key?(:retry_on_failure)
- options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure]
- end
-
params = params.clone
ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i }
begin
@@ -279,66 +282,71 @@
# Raise an exception so we can catch it for `retry_on_status`
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
rescue Elasticsearch::Transport::Transport::ServerError => e
- if response && @retry_on_status.include?(response.status)
- logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
- if tries <= (max_retries || DEFAULT_MAX_RETRIES)
+ if @retry_on_status.include?(response.status)
+ log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
+ if tries <= max_retries
retry
else
- logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger
+ log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries"
raise e
end
else
raise e
end
rescue *host_unreachable_exceptions => e
- logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger
+ log_error "[#{e.class}] #{e.message} #{connection.host.inspect}"
connection.dead!
- if reload_on_failure and tries < connections.all.size
- logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger
+ if @options[:reload_on_failure] and tries < connections.all.size
+ log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})"
reload_connections! and retry
end
- if max_retries
- logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger
+ if @options[:retry_on_failure]
+ log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}"
if tries <= max_retries
retry
else
- logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger
+ log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries"
raise e
end
else
raise e
end
rescue Exception => e
- logger.fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" if logger
+ log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})"
raise e
end #/begin
- duration = Time.now-start if logger || tracer
+ duration = Time.now - start
if response.status.to_i >= 300
- __log method, path, params, body, url, response, nil, 'N/A', duration if logger
+ __log_response method, path, params, body, url, response, nil, 'N/A', duration
__trace method, path, params, headers, body, url, response, nil, 'N/A', duration if tracer
# Log the failure only when `ignore` doesn't match the response status
- __log_failed response if logger && !ignore.include?(response.status.to_i)
+ unless ignore.include?(response.status.to_i)
+ log_fatal "[#{response.status}] #{response.body}"
+ end
__raise_transport_error response unless ignore.include?(response.status.to_i)
end
json = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/
- took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer
+ took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a'
- __log method, path, params, body, url, response, json, took, duration if logger && !ignore.include?(response.status.to_i)
+ unless ignore.include?(response.status.to_i)
+ __log_response method, path, params, body, url, response, json, took, duration
+ end
+
__trace method, path, params, headers, body, url, response, json, took, duration if tracer
Response.new response.status, json || response.body, response.headers
ensure
@last_request_at = Time.now
@@ -353,6 +361,6 @@
[Errno::ECONNREFUSED]
end
end
end
end
-end
+end
\ No newline at end of file