lib/elastic/transport/transport/base.rb in elastic-transport-8.0.0.pre1 vs lib/elastic/transport/transport/base.rb in elastic-transport-8.0.0.pre2
- old
+ new
@@ -51,10 +51,11 @@
@hosts = arguments[:hosts] || []
@options = arguments[:options] || {}
@options[:http] ||= {}
@options[:retry_on_status] ||= []
+ @options[:delay_on_retry] ||= 0
@block = block
@compression = !!@options[:compression]
@connections = __build_connections
@@ -230,11 +231,11 @@
# Converts any non-String object to JSON
#
# @api private
#
def __convert_to_json(o=nil, options={})
- o = o.is_a?(String) ? o : serializer.dump(o, options)
+ o.is_a?(String) ? o : serializer.dump(o, options)
end
# Returns a full URL based on information from host
#
# @param host [Hash] Host configuration passed in from {Client}
@@ -271,38 +272,36 @@
raise NoMethodError, 'Implement this method in your transport class' unless block_given?
start = Time.now
tries = 0
reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure])
+ delay_on_retry = opts.fetch(:delay_on_retry, @options[:delay_on_retry])
max_retries = if opts.key?(:retry_on_failure)
opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure]
elsif options.key?(:retry_on_failure)
options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure]
end
params = params.clone
-
ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i }
begin
+ sleep(delay_on_retry / 1000.0) if tries > 0
tries += 1
connection = get_connection or raise Error.new('Cannot get new connection from pool.')
if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
params = connection.connection.params.merge(params.to_hash)
end
url = connection.full_url(path, params)
-
response = block.call(connection, url)
-
connection.healthy! if connection.failures > 0
# Raise an exception so we can catch it for `retry_on_status`
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
-
rescue Elastic::Transport::Transport::ServerError => e
if response && @retry_on_status.include?(response.status)
log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
if tries <= (max_retries || DEFAULT_MAX_RETRIES)
retry
@@ -311,11 +310,10 @@
raise e
end
else
raise e
end
-
rescue *host_unreachable_exceptions => e
log_error "[#{e.class}] #{e.message} #{connection.host.inspect}"
connection.dead!
@@ -333,42 +331,32 @@
raise e
end
else
raise e
end
-
rescue Exception => e
log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})"
raise e
-
end #/begin
duration = Time.now - start
if response.status.to_i >= 300
- __log_response method, path, params, body, url, response, nil, 'N/A', duration
- __trace method, path, params, connection.connection.headers, body, url, response, nil, 'N/A', duration if tracer
+ __log_response(method, path, params, body, url, response, nil, 'N/A', duration)
+ __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer
# Log the failure only when `ignore` doesn't match the response status
- unless ignore.include?(response.status.to_i)
- log_fatal "[#{response.status}] #{response.body}"
- end
-
+ log_fatal("[#{response.status}] #{response.body}") unless ignore.include?(response.status.to_i)
__raise_transport_error response unless ignore.include?(response.status.to_i)
end
json = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/
took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a'
+ __log_response(method, path, params, body, url, response, json, took, duration) unless ignore.include?(response.status.to_i)
+ __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer
+ log_warn(response.headers['warning']) if response.headers&.[]('warning')
- unless ignore.include?(response.status.to_i)
- __log_response method, path, params, body, url, response, json, took, duration
- end
-
- __trace method, path, params, connection.connection.headers, body, url, response, nil, 'N/A', duration if tracer
-
- warnings(response.headers['warning']) if response.headers&.[]('warning')
-
Response.new response.status, json || response.body, response.headers
ensure
@last_request_at = Time.now
end
@@ -383,21 +371,42 @@
private
USER_AGENT_STR = 'User-Agent'.freeze
USER_AGENT_REGEX = /user\-?\_?agent/
+ ACCEPT_ENCODING = 'Accept-Encoding'.freeze
+ CONTENT_ENCODING = 'Content-Encoding'.freeze
CONTENT_TYPE_STR = 'Content-Type'.freeze
CONTENT_TYPE_REGEX = /content\-?\_?type/
DEFAULT_CONTENT_TYPE = 'application/json'.freeze
GZIP = 'gzip'.freeze
- ACCEPT_ENCODING = 'Accept-Encoding'.freeze
GZIP_FIRST_TWO_BYTES = '1f8b'.freeze
HEX_STRING_DIRECTIVE = 'H*'.freeze
RUBY_ENCODING = '1.9'.respond_to?(:force_encoding)
+ def compress_request(body, headers)
+ if body
+ headers ||= {}
+
+ if gzipped?(body)
+ headers[CONTENT_ENCODING] = GZIP
+ elsif use_compression?
+ headers[CONTENT_ENCODING] = GZIP
+ gzip = Zlib::GzipWriter.new(StringIO.new)
+ gzip << body
+ body = gzip.close.string
+ else
+ headers.delete(CONTENT_ENCODING)
+ end
+ elsif headers
+ headers.delete(CONTENT_ENCODING)
+ end
+
+ [body, headers]
+ end
+
def decompress_response(body)
- return body unless use_compression?
return body unless gzipped?(body)
io = StringIO.new(body)
gzip_reader = if RUBY_ENCODING
Zlib::GzipReader.new(io, :encoding => 'ASCII-8BIT')
@@ -406,10 +415,12 @@
end
gzip_reader.read
end
def gzipped?(body)
+ return unless body && !body.empty?
+
body[0..1].unpack(HEX_STRING_DIRECTIVE)[0] == GZIP_FIRST_TWO_BYTES
end
def use_compression?
@compression
@@ -439,11 +450,15 @@
end
"elastic-transport-ruby/#{VERSION} (#{meta.join('; ')})"
end
end
- def warnings(warning)
- warn("warning: #{warning}")
+ def connection_headers(connection)
+ if defined?(Elastic::Transport::Transport::HTTP::Manticore) && self.class == Elastic::Transport::Transport::HTTP::Manticore
+ @request_options[:headers]
+ else
+ connection.connection.headers
+ end
end
end
end
end
end