lib/elasticsearch/transport/client.rb in elasticsearch-transport-6.1.0 vs lib/elasticsearch/transport/client.rb in elasticsearch-transport-6.2.0
- old
+ new
@@ -22,10 +22,15 @@
logger.progname = 'elasticsearch.tracer'
logger.formatter = proc { |severity, datetime, progname, msg| "#{msg}\n" }
logger
end
+ # The default host and port to use if not otherwise specified.
+ #
+ # @since 7.0.0
+ DEFAULT_HOST = 'localhost:9200'.freeze
+
# Returns the transport object.
#
# @see Elasticsearch::Transport::Transport::Base
# @see Elasticsearch::Transport::Transport::HTTP::Faraday
#
@@ -81,58 +86,67 @@
# (Default: GET)
#
# @yield [faraday] Access and configure the `Faraday::Connection` instance directly with a block
#
def initialize(arguments={}, &block)
+ @options = arguments
@arguments = arguments
-
- hosts = @arguments[:hosts] || \
- @arguments[:host] || \
- @arguments[:url] || \
- @arguments[:urls] || \
- ENV.fetch('ELASTICSEARCH_URL', 'localhost:9200')
-
@arguments[:logger] ||= @arguments[:log] ? DEFAULT_LOGGER.call() : nil
@arguments[:tracer] ||= @arguments[:trace] ? DEFAULT_TRACER.call() : nil
@arguments[:reload_connections] ||= false
@arguments[:retry_on_failure] ||= false
@arguments[:reload_on_failure] ||= false
@arguments[:randomize_hosts] ||= false
@arguments[:transport_options] ||= {}
@arguments[:http] ||= {}
+ @options[:http] ||= {}
- @arguments[:transport_options].update(:request => { :timeout => @arguments[:request_timeout] } ) if @arguments[:request_timeout]
+ @seeds = __extract_hosts(@arguments[:hosts] ||
+ @arguments[:host] ||
+ @arguments[:url] ||
+ @arguments[:urls] ||
+ ENV['ELASTICSEARCH_URL'] ||
+ DEFAULT_HOST)
- @arguments[:transport_options][:headers] ||= {}
- @arguments[:transport_options][:headers].update 'Content-Type' => 'application/json' unless @arguments[:transport_options][:headers].keys.any? {|k| k.to_s.downcase =~ /content\-?\_?type/}
-
@send_get_body_as = @arguments[:send_get_body_as] || 'GET'
- transport_class = @arguments[:transport_class] || DEFAULT_TRANSPORT_CLASS
+ if @arguments[:request_timeout]
+ @arguments[:transport_options][:request] = { :timeout => @arguments[:request_timeout] }
+ end
- @transport = @arguments[:transport] || begin
+ @arguments[:transport_options][:headers] ||= {}
+
+ unless @arguments[:transport_options][:headers].keys.any? {|k| k.to_s.downcase =~ /content\-?\_?type/}
+ @arguments[:transport_options][:headers]['Content-Type'] = 'application/json'
+ end
+
+ if @arguments[:transport]
+ @transport = @arguments[:transport]
+ else
+ transport_class = @arguments[:transport_class] || DEFAULT_TRANSPORT_CLASS
if transport_class == Transport::HTTP::Faraday
- transport_class.new(:hosts => __extract_hosts(hosts, @arguments), :options => @arguments) do |faraday|
+ @transport = transport_class.new(:hosts => @seeds, :options => @arguments) do |faraday|
block.call faraday if block
unless (h = faraday.builder.handlers.last) && h.name.start_with?("Faraday::Adapter")
faraday.adapter(@arguments[:adapter] || __auto_detect_adapter)
end
end
else
- transport_class.new(:hosts => __extract_hosts(hosts, @arguments), :options => @arguments)
+ @transport = transport_class.new(:hosts => @seeds, :options => @arguments)
end
end
end
# Performs a request through delegation to {#transport}.
#
def perform_request(method, path, params={}, body=nil, headers=nil)
method = @send_get_body_as if 'GET' == method && body
-
- transport.perform_request method, path, params, body, headers
+ transport.perform_request(method, path, params, body, headers)
end
+ private
+
# Normalizes and returns hosts configuration.
#
# Arrayifies the `hosts_config` argument and extracts `host` and `port` info from strings.
# Performs shuffling when the `randomize_hosts` option is set.
#
@@ -141,55 +155,60 @@
# @return [Array<Hash>]
# @raise [ArgumentError]
#
# @api private
#
- def __extract_hosts(hosts_config, options={})
- if hosts_config.is_a?(Hash)
- hosts = [ hosts_config ]
- else
- if hosts_config.is_a?(String) && hosts_config.include?(',')
- hosts = hosts_config.split(/\s*,\s*/)
- else
- hosts = Array(hosts_config)
- end
- end
+ def __extract_hosts(hosts_config)
+ hosts = case hosts_config
+ when String
+ hosts_config.split(',').map { |h| h.strip! || h }
+ when Array
+ hosts_config
+ when Hash, URI
+ [ hosts_config ]
+ else
+ Array(hosts_config)
+ end
- result = hosts.map do |host|
- host_parts = case host
- when String
- if host =~ /^[a-z]+\:\/\//
- uri = URI.parse(host)
- { :scheme => uri.scheme, :user => uri.user, :password => uri.password, :host => uri.host, :path => uri.path, :port => uri.port }
- else
- host, port = host.split(':')
- { :host => host, :port => port }
- end
- when URI
- { :scheme => host.scheme, :user => host.user, :password => host.password, :host => host.host, :path => host.path, :port => host.port }
- when Hash
- host
- else
- raise ArgumentError, "Please pass host as a String, URI or Hash -- #{host.class} given."
- end
+ host_list = hosts.map { |host| __parse_host(host) }
+ @options[:randomize_hosts] ? host_list.shuffle! : host_list
+ end
- host_parts[:port] = host_parts[:port].to_i unless host_parts[:port].nil?
+ def __parse_host(host)
+ host_parts = case host
+ when String
+ if host =~ /^[a-z]+\:\/\//
+ uri = URI.parse(host)
+ { :scheme => uri.scheme,
+ :user => uri.user,
+ :password => uri.password,
+ :host => uri.host,
+ :path => uri.path,
+ :port => uri.port }
+ else
+ host, port = host.split(':')
+ { :host => host,
+ :port => port }
+ end
+ when URI
+ { :scheme => host.scheme,
+ :user => host.user,
+ :password => host.password,
+ :host => host.host,
+ :path => host.path,
+ :port => host.port }
+ when Hash
+ host
+ else
+ raise ArgumentError, "Please pass host as a String, URI or Hash -- #{host.class} given."
+ end
- # Transfer the selected host parts such as authentication credentials to `options`,
- # so we can re-use them when reloading connections
- #
- host_parts.select { |k,v| [:scheme, :port, :user, :password].include?(k) }.each do |k,v|
- @arguments[:http][k] ||= v
- end
+ @options[:http][:user] ||= host_parts[:user]
+ @options[:http][:password] ||= host_parts[:password]
- # Remove the trailing slash
- host_parts[:path].chomp!('/') if host_parts[:path]
-
- host_parts
- end
-
- result.shuffle! if options[:randomize_hosts]
- result
+ host_parts[:port] = host_parts[:port].to_i if host_parts[:port]
+ host_parts[:path].chomp!('/') if host_parts[:path]
+ host_parts
end
# Auto-detect the best adapter (HTTP "driver") available, based on libraries
# loaded by the user, preferring those with persistent connections
# ("keep-alive") by default