lib/elastic/transport/transport/base.rb in elastic-transport-8.1.3 vs lib/elastic/transport/transport/base.rb in elastic-transport-8.2.0
- old
+ new
@@ -22,16 +22,16 @@
#
module Base
include Loggable
DEFAULT_PORT = 9200
- DEFAULT_PROTOCOL = 'http'
+ DEFAULT_PROTOCOL = 'http'.freeze
DEFAULT_RELOAD_AFTER = 10_000 # Requests
DEFAULT_RESURRECT_AFTER = 60 # Seconds
DEFAULT_MAX_RETRIES = 3 # Requests
DEFAULT_SERIALIZER_CLASS = Serializer::MultiJson
- SANITIZED_PASSWORD = '*' * (rand(14)+1)
+ SANITIZED_PASSWORD = '*' * (rand(14) + 1)
attr_reader :hosts, :options, :connections, :counter, :last_request_at, :protocol
attr_accessor :serializer, :sniffer, :logger, :tracer,
:reload_connections, :reload_after,
:resurrect_after
@@ -57,11 +57,11 @@
@block = block
@compression = !!@options[:compression]
@connections = __build_connections
- @serializer = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
+ @serializer = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self))
@protocol = options[:protocol] || DEFAULT_PROTOCOL
@logger = options[:logger]
@tracer = options[:tracer]
@@ -70,11 +70,11 @@
@counter_mtx = Mutex.new
@last_request_at = Time.now
@reload_connections = options[:reload_connections]
@reload_after = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
@resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
- @retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
+ @retry_on_status = Array(options[:retry_on_status]).map(&:to_i)
end
# Returns a connection from the connection pool by delegating to {Connections::Collection#get_connection}.
#
# Resurrects dead connection if the `resurrect_after` timeout has passed.
@@ -85,33 +85,33 @@
#
def get_connection(options={})
resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after
@counter_mtx.synchronize { @counter += 1 }
- reload_connections! if reload_connections && counter % reload_after == 0
+ reload_connections! if reload_connections && (counter % reload_after).zero?
connections.get_connection(options)
end
# Reloads and replaces the connection collection based on cluster information
#
# @see Sniffer#hosts
#
def reload_connections!
hosts = sniffer.hosts
- __rebuild_connections :hosts => hosts, :options => options
+ __rebuild_connections(hosts: hosts, options: options)
self
rescue SnifferTimeoutError
- log_error "[SnifferTimeoutError] Timeout when reloading connections."
+ log_error('[SnifferTimeoutError] Timeout when reloading connections.')
self
end
# Tries to "resurrect" all eligible dead connections
#
# @see Connections::Connection#resurrect!
#
def resurrect_dead_connections!
- connections.dead.each { |c| c.resurrect! }
+ connections.dead.each(&:resurrect!)
end
# Rebuilds the connections collection in the transport.
#
# The methods *adds* new connections from the passed hosts to the collection,
@@ -126,11 +126,11 @@
@options = arguments[:options] || {}
__close_connections
new_connections = __build_connections
- stale_connections = @connections.all.select { |c| ! new_connections.include?(c) }
+ stale_connections = @connections.all.reject { |c| new_connections.include?(c) }
new_connections = new_connections.reject { |c| @connections.all.include?(c) }
@connections.remove(stale_connections)
@connections.add(new_connections)
@connections
@@ -175,12 +175,12 @@
# See {HTTP::Faraday#__build_connection} for an example.
#
# @return [Connections::Connection]
# @api private
#
- def __build_connection(host, options={}, block=nil)
- raise NoMethodError, "Implement this method in your class"
+ def __build_connection(host, options = {}, block = nil)
+ raise NoMethodError, 'Implement this method in your class'
end
# Closes the connections collection
#
# @api private
@@ -207,18 +207,18 @@
#
# @api private
#
def __trace(method, path, params, headers, body, url, response, json, took, duration)
trace_url = "http://localhost:9200/#{path}?pretty" +
- ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
+ (params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}")
trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
trace_command = "curl -X #{method.to_s.upcase}"
- trace_command += " -H '#{headers.collect { |k,v| "#{k}: #{v}" }.join(", ")}'" if headers && !headers.empty?
+ trace_command += " -H '#{headers.collect { |k, v| "#{k}: #{v}" }.join(", ")}'" if headers && !headers.empty?
trace_command += " '#{trace_url}'#{trace_body}\n"
tracer.info trace_command
tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#"
- tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n"
+ tracer.debug json ? serializer.dump(json, pretty: true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n"
end
# Raise error specific for the HTTP response status or a generic server error
#
# @api private
@@ -274,67 +274,62 @@
start = Time.now
tries = 0
reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure])
delay_on_retry = opts.fetch(:delay_on_retry, @options[:delay_on_retry])
- max_retries = if opts.key?(:retry_on_failure)
- opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure]
- elsif options.key?(:retry_on_failure)
- options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure]
- end
+ max_retries = max_retries(opts) || max_retries(options)
params = params.clone
- ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i }
+ # Transforms ignore status codes to Integer
+ ignore = Array(params.delete(:ignore)).compact.map(&:to_i)
begin
sleep(delay_on_retry / 1000.0) if tries > 0
- tries += 1
+ tries += 1
connection = get_connection or raise Error.new('Cannot get new connection from pool.')
- if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
+ if connection.connection.respond_to?(:params) &&
+ connection.connection.params.respond_to?(:to_hash)
params = connection.connection.params.merge(params.to_hash)
end
- url = connection.full_url(path, params)
+ url = connection.full_url(path, params)
response = block.call(connection, url)
- connection.healthy! if connection.failures > 0
+ connection.healthy! if connection.failures.positive?
# Raise an exception so we can catch it for `retry_on_status`
- __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
+ __raise_transport_error(response) if response.status.to_i >= 300 &&
+ @retry_on_status.include?(response.status.to_i)
rescue Elastic::Transport::Transport::ServerError => e
- if response && @retry_on_status.include?(response.status)
- log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
- if tries <= (max_retries || DEFAULT_MAX_RETRIES)
- retry
- else
- log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries"
- raise e
- end
+ raise e unless response && @retry_on_status.include?(response.status)
+
+ log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
+ if tries <= (max_retries || DEFAULT_MAX_RETRIES)
+ retry
else
+ log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries"
raise e
end
rescue *host_unreachable_exceptions => e
log_error "[#{e.class}] #{e.message} #{connection.host.inspect}"
connection.dead!
- if reload_on_failure and tries < connections.all.size
+ if reload_on_failure && tries < connections.all.size
log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})"
reload_connections! and retry
end
exception = Elastic::Transport::Transport::Error.new(e.message)
- if max_retries
- log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}"
- if tries <= max_retries
- retry
- else
- log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries"
- raise exception
- end
+ raise exception unless max_retries
+
+ log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}"
+ if tries <= max_retries
+ retry
else
+ log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries"
raise exception
end
rescue Exception => e
log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})"
raise e
@@ -349,12 +344,15 @@
# Log the failure only when `ignore` doesn't match the response status
log_fatal("[#{response.status}] #{response.body}") unless ignore.include?(response.status.to_i)
__raise_transport_error response unless ignore.include?(response.status.to_i)
end
- json = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/
- took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a'
+ json = serializer.load(response.body) if response.body &&
+ !response.body.empty? &&
+ response.headers &&
+ response.headers["content-type"] =~ /json/
+ took = (json['took'] ? sprintf('%.3fs', json['took'] / 1000.0) : 'n/a') rescue 'n/a'
__log_response(method, path, params, body, url, response, json, took, duration) unless ignore.include?(response.status.to_i)
__trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer
log_warn(response.headers['warning']) if response.headers&.[]('warning')
Response.new response.status, json || response.body, response.headers
@@ -372,21 +370,25 @@
end
private
USER_AGENT_STR = 'User-Agent'.freeze
- USER_AGENT_REGEX = /user\-?\_?agent/
+ USER_AGENT_REGEX = /user-?_?agent/
ACCEPT_ENCODING = 'Accept-Encoding'.freeze
CONTENT_ENCODING = 'Content-Encoding'.freeze
CONTENT_TYPE_STR = 'Content-Type'.freeze
- CONTENT_TYPE_REGEX = /content\-?\_?type/
+ CONTENT_TYPE_REGEX = /content-?_?type/
DEFAULT_CONTENT_TYPE = 'application/json'.freeze
GZIP = 'gzip'.freeze
GZIP_FIRST_TWO_BYTES = '1f8b'.freeze
HEX_STRING_DIRECTIVE = 'H*'.freeze
RUBY_ENCODING = '1.9'.respond_to?(:force_encoding)
+ def max_retries(opts)
+ opts[:retry_on_failure] == true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure]
+ end
+
def compress_request(body, headers)
if body
headers ||= {}
if gzipped?(body)
@@ -409,11 +411,11 @@
def decompress_response(body)
return body unless gzipped?(body)
io = StringIO.new(body)
gzip_reader = if RUBY_ENCODING
- Zlib::GzipReader.new(io, :encoding => 'ASCII-8BIT')
+ Zlib::GzipReader.new(io, encoding: 'ASCII-8BIT')
else
Zlib::GzipReader.new(io)
end
gzip_reader.read
end
@@ -427,11 +429,11 @@
def use_compression?
@compression
end
def apply_headers(client, options)
- headers = options[:headers].clone || {}
+ headers = options[:headers] || {}
headers[CONTENT_TYPE_STR] = find_value(headers, CONTENT_TYPE_REGEX) || DEFAULT_CONTENT_TYPE
headers[USER_AGENT_STR] = find_value(headers, USER_AGENT_REGEX) || user_agent_header(client)
client.headers[ACCEPT_ENCODING] = GZIP if use_compression?
client.headers.merge!(headers)
end
@@ -442,21 +444,22 @@
hash.delete(key_value[0])
key_value[1]
end
end
- def user_agent_header(client)
+ def user_agent_header(_client)
@user_agent ||= begin
meta = ["RUBY_VERSION: #{RUBY_VERSION}"]
if RbConfig::CONFIG && RbConfig::CONFIG['host_os']
meta << "#{RbConfig::CONFIG['host_os'].split('_').first[/[a-z]+/i].downcase} #{RbConfig::CONFIG['target_cpu']}"
end
"elastic-transport-ruby/#{VERSION} (#{meta.join('; ')})"
end
end
def connection_headers(connection)
- if defined?(Elastic::Transport::Transport::HTTP::Manticore) && self.class == Elastic::Transport::Transport::HTTP::Manticore
+ if defined?(Elastic::Transport::Transport::HTTP::Manticore) &&
+ instance_of?(Elastic::Transport::Transport::HTTP::Manticore)
@request_options[:headers]
else
connection.connection.headers
end
end