lib/elasticsearch/transport/client.rb in elasticsearch-transport-6.1.0 vs lib/elasticsearch/transport/client.rb in elasticsearch-transport-6.2.0

- old
+ new

@@ -22,10 +22,15 @@ logger.progname = 'elasticsearch.tracer' logger.formatter = proc { |severity, datetime, progname, msg| "#{msg}\n" } logger end + # The default host and port to use if not otherwise specified. + # + # @since 7.0.0 + DEFAULT_HOST = 'localhost:9200'.freeze + # Returns the transport object. # # @see Elasticsearch::Transport::Transport::Base # @see Elasticsearch::Transport::Transport::HTTP::Faraday # @@ -81,58 +86,67 @@ # (Default: GET) # # @yield [faraday] Access and configure the `Faraday::Connection` instance directly with a block # def initialize(arguments={}, &block) + @options = arguments @arguments = arguments - - hosts = @arguments[:hosts] || \ - @arguments[:host] || \ - @arguments[:url] || \ - @arguments[:urls] || \ - ENV.fetch('ELASTICSEARCH_URL', 'localhost:9200') - @arguments[:logger] ||= @arguments[:log] ? DEFAULT_LOGGER.call() : nil @arguments[:tracer] ||= @arguments[:trace] ? DEFAULT_TRACER.call() : nil @arguments[:reload_connections] ||= false @arguments[:retry_on_failure] ||= false @arguments[:reload_on_failure] ||= false @arguments[:randomize_hosts] ||= false @arguments[:transport_options] ||= {} @arguments[:http] ||= {} + @options[:http] ||= {} - @arguments[:transport_options].update(:request => { :timeout => @arguments[:request_timeout] } ) if @arguments[:request_timeout] + @seeds = __extract_hosts(@arguments[:hosts] || + @arguments[:host] || + @arguments[:url] || + @arguments[:urls] || + ENV['ELASTICSEARCH_URL'] || + DEFAULT_HOST) - @arguments[:transport_options][:headers] ||= {} - @arguments[:transport_options][:headers].update 'Content-Type' => 'application/json' unless @arguments[:transport_options][:headers].keys.any? {|k| k.to_s.downcase =~ /content\-?\_?type/} - @send_get_body_as = @arguments[:send_get_body_as] || 'GET' - transport_class = @arguments[:transport_class] || DEFAULT_TRANSPORT_CLASS + if @arguments[:request_timeout] + @arguments[:transport_options][:request] = { :timeout => @arguments[:request_timeout] } + end - @transport = @arguments[:transport] || begin + @arguments[:transport_options][:headers] ||= {} + + unless @arguments[:transport_options][:headers].keys.any? {|k| k.to_s.downcase =~ /content\-?\_?type/} + @arguments[:transport_options][:headers]['Content-Type'] = 'application/json' + end + + if @arguments[:transport] + @transport = @arguments[:transport] + else + transport_class = @arguments[:transport_class] || DEFAULT_TRANSPORT_CLASS if transport_class == Transport::HTTP::Faraday - transport_class.new(:hosts => __extract_hosts(hosts, @arguments), :options => @arguments) do |faraday| + @transport = transport_class.new(:hosts => @seeds, :options => @arguments) do |faraday| block.call faraday if block unless (h = faraday.builder.handlers.last) && h.name.start_with?("Faraday::Adapter") faraday.adapter(@arguments[:adapter] || __auto_detect_adapter) end end else - transport_class.new(:hosts => __extract_hosts(hosts, @arguments), :options => @arguments) + @transport = transport_class.new(:hosts => @seeds, :options => @arguments) end end end # Performs a request through delegation to {#transport}. # def perform_request(method, path, params={}, body=nil, headers=nil) method = @send_get_body_as if 'GET' == method && body - - transport.perform_request method, path, params, body, headers + transport.perform_request(method, path, params, body, headers) end + private + # Normalizes and returns hosts configuration. # # Arrayifies the `hosts_config` argument and extracts `host` and `port` info from strings. # Performs shuffling when the `randomize_hosts` option is set. # @@ -141,55 +155,60 @@ # @return [Array<Hash>] # @raise [ArgumentError] # # @api private # - def __extract_hosts(hosts_config, options={}) - if hosts_config.is_a?(Hash) - hosts = [ hosts_config ] - else - if hosts_config.is_a?(String) && hosts_config.include?(',') - hosts = hosts_config.split(/\s*,\s*/) - else - hosts = Array(hosts_config) - end - end + def __extract_hosts(hosts_config) + hosts = case hosts_config + when String + hosts_config.split(',').map { |h| h.strip! || h } + when Array + hosts_config + when Hash, URI + [ hosts_config ] + else + Array(hosts_config) + end - result = hosts.map do |host| - host_parts = case host - when String - if host =~ /^[a-z]+\:\/\// - uri = URI.parse(host) - { :scheme => uri.scheme, :user => uri.user, :password => uri.password, :host => uri.host, :path => uri.path, :port => uri.port } - else - host, port = host.split(':') - { :host => host, :port => port } - end - when URI - { :scheme => host.scheme, :user => host.user, :password => host.password, :host => host.host, :path => host.path, :port => host.port } - when Hash - host - else - raise ArgumentError, "Please pass host as a String, URI or Hash -- #{host.class} given." - end + host_list = hosts.map { |host| __parse_host(host) } + @options[:randomize_hosts] ? host_list.shuffle! : host_list + end - host_parts[:port] = host_parts[:port].to_i unless host_parts[:port].nil? + def __parse_host(host) + host_parts = case host + when String + if host =~ /^[a-z]+\:\/\// + uri = URI.parse(host) + { :scheme => uri.scheme, + :user => uri.user, + :password => uri.password, + :host => uri.host, + :path => uri.path, + :port => uri.port } + else + host, port = host.split(':') + { :host => host, + :port => port } + end + when URI + { :scheme => host.scheme, + :user => host.user, + :password => host.password, + :host => host.host, + :path => host.path, + :port => host.port } + when Hash + host + else + raise ArgumentError, "Please pass host as a String, URI or Hash -- #{host.class} given." + end - # Transfer the selected host parts such as authentication credentials to `options`, - # so we can re-use them when reloading connections - # - host_parts.select { |k,v| [:scheme, :port, :user, :password].include?(k) }.each do |k,v| - @arguments[:http][k] ||= v - end + @options[:http][:user] ||= host_parts[:user] + @options[:http][:password] ||= host_parts[:password] - # Remove the trailing slash - host_parts[:path].chomp!('/') if host_parts[:path] - - host_parts - end - - result.shuffle! if options[:randomize_hosts] - result + host_parts[:port] = host_parts[:port].to_i if host_parts[:port] + host_parts[:path].chomp!('/') if host_parts[:path] + host_parts end # Auto-detect the best adapter (HTTP "driver") available, based on libraries # loaded by the user, preferring those with persistent connections # ("keep-alive") by default