lib/elastic/transport/transport/base.rb in elastic-transport-8.1.3 vs lib/elastic/transport/transport/base.rb in elastic-transport-8.2.0

- old
+ new

@@ -22,16 +22,16 @@ # module Base include Loggable DEFAULT_PORT = 9200 - DEFAULT_PROTOCOL = 'http' + DEFAULT_PROTOCOL = 'http'.freeze DEFAULT_RELOAD_AFTER = 10_000 # Requests DEFAULT_RESURRECT_AFTER = 60 # Seconds DEFAULT_MAX_RETRIES = 3 # Requests DEFAULT_SERIALIZER_CLASS = Serializer::MultiJson - SANITIZED_PASSWORD = '*' * (rand(14)+1) + SANITIZED_PASSWORD = '*' * (rand(14) + 1) attr_reader :hosts, :options, :connections, :counter, :last_request_at, :protocol attr_accessor :serializer, :sniffer, :logger, :tracer, :reload_connections, :reload_after, :resurrect_after @@ -57,11 +57,11 @@ @block = block @compression = !!@options[:compression] @connections = __build_connections - @serializer = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) ) + @serializer = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self)) @protocol = options[:protocol] || DEFAULT_PROTOCOL @logger = options[:logger] @tracer = options[:tracer] @@ -70,11 +70,11 @@ @counter_mtx = Mutex.new @last_request_at = Time.now @reload_connections = options[:reload_connections] @reload_after = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER - @retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i } + @retry_on_status = Array(options[:retry_on_status]).map(&:to_i) end # Returns a connection from the connection pool by delegating to {Connections::Collection#get_connection}. # # Resurrects dead connection if the `resurrect_after` timeout has passed. @@ -85,33 +85,33 @@ # def get_connection(options={}) resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after @counter_mtx.synchronize { @counter += 1 } - reload_connections! if reload_connections && counter % reload_after == 0 + reload_connections! if reload_connections && (counter % reload_after).zero? connections.get_connection(options) end # Reloads and replaces the connection collection based on cluster information # # @see Sniffer#hosts # def reload_connections! hosts = sniffer.hosts - __rebuild_connections :hosts => hosts, :options => options + __rebuild_connections(hosts: hosts, options: options) self rescue SnifferTimeoutError - log_error "[SnifferTimeoutError] Timeout when reloading connections." + log_error('[SnifferTimeoutError] Timeout when reloading connections.') self end # Tries to "resurrect" all eligible dead connections # # @see Connections::Connection#resurrect! # def resurrect_dead_connections! - connections.dead.each { |c| c.resurrect! } + connections.dead.each(&:resurrect!) end # Rebuilds the connections collection in the transport. # # The methods *adds* new connections from the passed hosts to the collection, @@ -126,11 +126,11 @@ @options = arguments[:options] || {} __close_connections new_connections = __build_connections - stale_connections = @connections.all.select { |c| ! new_connections.include?(c) } + stale_connections = @connections.all.reject { |c| new_connections.include?(c) } new_connections = new_connections.reject { |c| @connections.all.include?(c) } @connections.remove(stale_connections) @connections.add(new_connections) @connections @@ -175,12 +175,12 @@ # See {HTTP::Faraday#__build_connection} for an example. # # @return [Connections::Connection] # @api private # - def __build_connection(host, options={}, block=nil) - raise NoMethodError, "Implement this method in your class" + def __build_connection(host, options = {}, block = nil) + raise NoMethodError, 'Implement this method in your class' end # Closes the connections collection # # @api private @@ -207,18 +207,18 @@ # # @api private # def __trace(method, path, params, headers, body, url, response, json, took, duration) trace_url = "http://localhost:9200/#{path}?pretty" + - ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) + (params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}") trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' trace_command = "curl -X #{method.to_s.upcase}" - trace_command += " -H '#{headers.collect { |k,v| "#{k}: #{v}" }.join(", ")}'" if headers && !headers.empty? + trace_command += " -H '#{headers.collect { |k, v| "#{k}: #{v}" }.join(", ")}'" if headers && !headers.empty? trace_command += " '#{trace_url}'#{trace_body}\n" tracer.info trace_command tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#" - tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n" + tracer.debug json ? serializer.dump(json, pretty: true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n" end # Raise error specific for the HTTP response status or a generic server error # # @api private @@ -274,67 +274,62 @@ start = Time.now tries = 0 reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure]) delay_on_retry = opts.fetch(:delay_on_retry, @options[:delay_on_retry]) - max_retries = if opts.key?(:retry_on_failure) - opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure] - elsif options.key?(:retry_on_failure) - options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure] - end + max_retries = max_retries(opts) || max_retries(options) params = params.clone - ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i } + # Transforms ignore status codes to Integer + ignore = Array(params.delete(:ignore)).compact.map(&:to_i) begin sleep(delay_on_retry / 1000.0) if tries > 0 - tries += 1 + tries += 1 connection = get_connection or raise Error.new('Cannot get new connection from pool.') - if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash) + if connection.connection.respond_to?(:params) && + connection.connection.params.respond_to?(:to_hash) params = connection.connection.params.merge(params.to_hash) end - url = connection.full_url(path, params) + url = connection.full_url(path, params) response = block.call(connection, url) - connection.healthy! if connection.failures > 0 + connection.healthy! if connection.failures.positive? # Raise an exception so we can catch it for `retry_on_status` - __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i) + __raise_transport_error(response) if response.status.to_i >= 300 && + @retry_on_status.include?(response.status.to_i) rescue Elastic::Transport::Transport::ServerError => e - if response && @retry_on_status.include?(response.status) - log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}" - if tries <= (max_retries || DEFAULT_MAX_RETRIES) - retry - else - log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" - raise e - end + raise e unless response && @retry_on_status.include?(response.status) + + log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}" + if tries <= (max_retries || DEFAULT_MAX_RETRIES) + retry else + log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" raise e end rescue *host_unreachable_exceptions => e log_error "[#{e.class}] #{e.message} #{connection.host.inspect}" connection.dead! - if reload_on_failure and tries < connections.all.size + if reload_on_failure && tries < connections.all.size log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" reload_connections! and retry end exception = Elastic::Transport::Transport::Error.new(e.message) - if max_retries - log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" - if tries <= max_retries - retry - else - log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" - raise exception - end + raise exception unless max_retries + + log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" + if tries <= max_retries + retry else + log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" raise exception end rescue Exception => e log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" raise e @@ -349,12 +344,15 @@ # Log the failure only when `ignore` doesn't match the response status log_fatal("[#{response.status}] #{response.body}") unless ignore.include?(response.status.to_i) __raise_transport_error response unless ignore.include?(response.status.to_i) end - json = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/ - took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' + json = serializer.load(response.body) if response.body && + !response.body.empty? && + response.headers && + response.headers["content-type"] =~ /json/ + took = (json['took'] ? sprintf('%.3fs', json['took'] / 1000.0) : 'n/a') rescue 'n/a' __log_response(method, path, params, body, url, response, json, took, duration) unless ignore.include?(response.status.to_i) __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer log_warn(response.headers['warning']) if response.headers&.[]('warning') Response.new response.status, json || response.body, response.headers @@ -372,21 +370,25 @@ end private USER_AGENT_STR = 'User-Agent'.freeze - USER_AGENT_REGEX = /user\-?\_?agent/ + USER_AGENT_REGEX = /user-?_?agent/ ACCEPT_ENCODING = 'Accept-Encoding'.freeze CONTENT_ENCODING = 'Content-Encoding'.freeze CONTENT_TYPE_STR = 'Content-Type'.freeze - CONTENT_TYPE_REGEX = /content\-?\_?type/ + CONTENT_TYPE_REGEX = /content-?_?type/ DEFAULT_CONTENT_TYPE = 'application/json'.freeze GZIP = 'gzip'.freeze GZIP_FIRST_TWO_BYTES = '1f8b'.freeze HEX_STRING_DIRECTIVE = 'H*'.freeze RUBY_ENCODING = '1.9'.respond_to?(:force_encoding) + def max_retries(opts) + opts[:retry_on_failure] == true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure] + end + def compress_request(body, headers) if body headers ||= {} if gzipped?(body) @@ -409,11 +411,11 @@ def decompress_response(body) return body unless gzipped?(body) io = StringIO.new(body) gzip_reader = if RUBY_ENCODING - Zlib::GzipReader.new(io, :encoding => 'ASCII-8BIT') + Zlib::GzipReader.new(io, encoding: 'ASCII-8BIT') else Zlib::GzipReader.new(io) end gzip_reader.read end @@ -427,11 +429,11 @@ def use_compression? @compression end def apply_headers(client, options) - headers = options[:headers].clone || {} + headers = options[:headers] || {} headers[CONTENT_TYPE_STR] = find_value(headers, CONTENT_TYPE_REGEX) || DEFAULT_CONTENT_TYPE headers[USER_AGENT_STR] = find_value(headers, USER_AGENT_REGEX) || user_agent_header(client) client.headers[ACCEPT_ENCODING] = GZIP if use_compression? client.headers.merge!(headers) end @@ -442,21 +444,22 @@ hash.delete(key_value[0]) key_value[1] end end - def user_agent_header(client) + def user_agent_header(_client) @user_agent ||= begin meta = ["RUBY_VERSION: #{RUBY_VERSION}"] if RbConfig::CONFIG && RbConfig::CONFIG['host_os'] meta << "#{RbConfig::CONFIG['host_os'].split('_').first[/[a-z]+/i].downcase} #{RbConfig::CONFIG['target_cpu']}" end "elastic-transport-ruby/#{VERSION} (#{meta.join('; ')})" end end def connection_headers(connection) - if defined?(Elastic::Transport::Transport::HTTP::Manticore) && self.class == Elastic::Transport::Transport::HTTP::Manticore + if defined?(Elastic::Transport::Transport::HTTP::Manticore) && + instance_of?(Elastic::Transport::Transport::HTTP::Manticore) @request_options[:headers] else connection.connection.headers end end