lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-7.15.0 vs lib/elasticsearch/transport/transport/base.rb in elasticsearch-transport-7.16.0

- old
+ new

@@ -52,10 +52,11 @@ @hosts = arguments[:hosts] || [] @options = arguments[:options] || {} @options[:http] ||= {} @options[:retry_on_status] ||= [] + @options[:delay_on_retry] ||= 0 @block = block @compression = !!@options[:compression] @connections = __build_connections @@ -221,11 +222,11 @@ # Converts any non-String object to JSON # # @api private # def __convert_to_json(o=nil, options={}) - o = o.is_a?(String) ? o : serializer.dump(o, options) + o.is_a?(String) ? o : serializer.dump(o, options) end # Returns a full URL based on information from host # # @param host [Hash] Host configuration passed in from {Client} @@ -262,10 +263,11 @@ raise NoMethodError, 'Implement this method in your transport class' unless block_given? start = Time.now tries = 0 reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure]) + delay_on_retry = opts.fetch(:delay_on_retry, @options[:delay_on_retry]) max_retries = if opts.key?(:retry_on_failure) opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure] elsif options.key?(:retry_on_failure) options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure] @@ -273,10 +275,11 @@ params = params.clone ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i } begin + sleep(delay_on_retry / 1000.0) if tries > 0 tries += 1 connection = get_connection or raise Error.new('Cannot get new connection from pool.') if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash) params = connection.connection.params.merge(params.to_hash) @@ -304,11 +307,10 @@ rescue *host_unreachable_exceptions => e log_error "[#{e.class}] #{e.message} #{connection.host.inspect}" connection.dead! - if reload_on_failure and tries < connections.all.size log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" reload_connections! and retry end @@ -346,11 +348,11 @@ unless ignore.include?(response.status.to_i) __log_response method, path, params, body, url, response, json, took, duration end __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer - warnings(response.headers['warning']) if response.headers&.[]('warning') + log_warn(response.headers['warning']) if response.headers&.[]('warning') Response.new response.status, json || response.body, response.headers ensure @last_request_at = Time.now end @@ -365,21 +367,42 @@ private USER_AGENT_STR = 'User-Agent'.freeze USER_AGENT_REGEX = /user\-?\_?agent/ + ACCEPT_ENCODING = 'Accept-Encoding'.freeze + CONTENT_ENCODING = 'Content-Encoding'.freeze CONTENT_TYPE_STR = 'Content-Type'.freeze CONTENT_TYPE_REGEX = /content\-?\_?type/ DEFAULT_CONTENT_TYPE = 'application/json'.freeze GZIP = 'gzip'.freeze - ACCEPT_ENCODING = 'Accept-Encoding'.freeze GZIP_FIRST_TWO_BYTES = '1f8b'.freeze HEX_STRING_DIRECTIVE = 'H*'.freeze RUBY_ENCODING = '1.9'.respond_to?(:force_encoding) + def compress_request(body, headers) + if body + headers ||= {} + + if gzipped?(body) + headers[CONTENT_ENCODING] = GZIP + elsif use_compression? + headers[CONTENT_ENCODING] = GZIP + gzip = Zlib::GzipWriter.new(StringIO.new) + gzip << body + body = gzip.close.string + else + headers.delete(CONTENT_ENCODING) + end + elsif headers + headers.delete(CONTENT_ENCODING) + end + + [body, headers] + end + def decompress_response(body) - return body unless use_compression? return body unless gzipped?(body) io = StringIO.new(body) gzip_reader = if RUBY_ENCODING Zlib::GzipReader.new(io, :encoding => 'ASCII-8BIT') @@ -388,10 +411,12 @@ end gzip_reader.read end def gzipped?(body) + return unless body && !body.empty? + body[0..1].unpack(HEX_STRING_DIRECTIVE)[0] == GZIP_FIRST_TWO_BYTES end def use_compression? @compression @@ -419,13 +444,9 @@ if RbConfig::CONFIG && RbConfig::CONFIG['host_os'] meta << "#{RbConfig::CONFIG['host_os'].split('_').first[/[a-z]+/i].downcase} #{RbConfig::CONFIG['target_cpu']}" end "elasticsearch-ruby/#{VERSION} (#{meta.join('; ')})" end - end - - def warnings(warning) - warn("warning: #{warning}") end def connection_headers(connection) if defined?(Elasticsearch::Transport::Transport::HTTP::Manticore) && self.class == Elasticsearch::Transport::Transport::HTTP::Manticore @request_options[:headers]