lib/elastic/transport/transport/base.rb in elastic-transport-8.0.0.pre1 vs lib/elastic/transport/transport/base.rb in elastic-transport-8.0.0.pre2

- old
+ new

@@ -51,10 +51,11 @@ @hosts = arguments[:hosts] || [] @options = arguments[:options] || {} @options[:http] ||= {} @options[:retry_on_status] ||= [] + @options[:delay_on_retry] ||= 0 @block = block @compression = !!@options[:compression] @connections = __build_connections @@ -230,11 +231,11 @@ # Converts any non-String object to JSON # # @api private # def __convert_to_json(o=nil, options={}) - o = o.is_a?(String) ? o : serializer.dump(o, options) + o.is_a?(String) ? o : serializer.dump(o, options) end # Returns a full URL based on information from host # # @param host [Hash] Host configuration passed in from {Client} @@ -271,38 +272,36 @@ raise NoMethodError, 'Implement this method in your transport class' unless block_given? start = Time.now tries = 0 reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure]) + delay_on_retry = opts.fetch(:delay_on_retry, @options[:delay_on_retry]) max_retries = if opts.key?(:retry_on_failure) opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure] elsif options.key?(:retry_on_failure) options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure] end params = params.clone - ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i } begin + sleep(delay_on_retry / 1000.0) if tries > 0 tries += 1 connection = get_connection or raise Error.new('Cannot get new connection from pool.') if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash) params = connection.connection.params.merge(params.to_hash) end url = connection.full_url(path, params) - response = block.call(connection, url) - connection.healthy! if connection.failures > 0 # Raise an exception so we can catch it for `retry_on_status` __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i) - rescue Elastic::Transport::Transport::ServerError => e if response && @retry_on_status.include?(response.status) log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if tries <= (max_retries || DEFAULT_MAX_RETRIES) retry @@ -311,11 +310,10 @@ raise e end else raise e end - rescue *host_unreachable_exceptions => e log_error "[#{e.class}] #{e.message} #{connection.host.inspect}" connection.dead! @@ -333,42 +331,32 @@ raise e end else raise e end - rescue Exception => e log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" raise e - end #/begin duration = Time.now - start if response.status.to_i >= 300 - __log_response method, path, params, body, url, response, nil, 'N/A', duration - __trace method, path, params, connection.connection.headers, body, url, response, nil, 'N/A', duration if tracer + __log_response(method, path, params, body, url, response, nil, 'N/A', duration) + __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer # Log the failure only when `ignore` doesn't match the response status - unless ignore.include?(response.status.to_i) - log_fatal "[#{response.status}] #{response.body}" - end - + log_fatal("[#{response.status}] #{response.body}") unless ignore.include?(response.status.to_i) __raise_transport_error response unless ignore.include?(response.status.to_i) end json = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/ took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' + __log_response(method, path, params, body, url, response, json, took, duration) unless ignore.include?(response.status.to_i) + __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer + log_warn(response.headers['warning']) if response.headers&.[]('warning') - unless ignore.include?(response.status.to_i) - __log_response method, path, params, body, url, response, json, took, duration - end - - __trace method, path, params, connection.connection.headers, body, url, response, nil, 'N/A', duration if tracer - - warnings(response.headers['warning']) if response.headers&.[]('warning') - Response.new response.status, json || response.body, response.headers ensure @last_request_at = Time.now end @@ -383,21 +371,42 @@ private USER_AGENT_STR = 'User-Agent'.freeze USER_AGENT_REGEX = /user\-?\_?agent/ + ACCEPT_ENCODING = 'Accept-Encoding'.freeze + CONTENT_ENCODING = 'Content-Encoding'.freeze CONTENT_TYPE_STR = 'Content-Type'.freeze CONTENT_TYPE_REGEX = /content\-?\_?type/ DEFAULT_CONTENT_TYPE = 'application/json'.freeze GZIP = 'gzip'.freeze - ACCEPT_ENCODING = 'Accept-Encoding'.freeze GZIP_FIRST_TWO_BYTES = '1f8b'.freeze HEX_STRING_DIRECTIVE = 'H*'.freeze RUBY_ENCODING = '1.9'.respond_to?(:force_encoding) + def compress_request(body, headers) + if body + headers ||= {} + + if gzipped?(body) + headers[CONTENT_ENCODING] = GZIP + elsif use_compression? + headers[CONTENT_ENCODING] = GZIP + gzip = Zlib::GzipWriter.new(StringIO.new) + gzip << body + body = gzip.close.string + else + headers.delete(CONTENT_ENCODING) + end + elsif headers + headers.delete(CONTENT_ENCODING) + end + + [body, headers] + end + def decompress_response(body) - return body unless use_compression? return body unless gzipped?(body) io = StringIO.new(body) gzip_reader = if RUBY_ENCODING Zlib::GzipReader.new(io, :encoding => 'ASCII-8BIT') @@ -406,10 +415,12 @@ end gzip_reader.read end def gzipped?(body) + return unless body && !body.empty? + body[0..1].unpack(HEX_STRING_DIRECTIVE)[0] == GZIP_FIRST_TWO_BYTES end def use_compression? @compression @@ -439,11 +450,15 @@ end "elastic-transport-ruby/#{VERSION} (#{meta.join('; ')})" end end - def warnings(warning) - warn("warning: #{warning}") + def connection_headers(connection) + if defined?(Elastic::Transport::Transport::HTTP::Manticore) && self.class == Elastic::Transport::Transport::HTTP::Manticore + @request_options[:headers] + else + connection.connection.headers + end end end end end end