# Licensed to Elasticsearch B.V under one or more agreements. # Elasticsearch B.V licenses this file to you under the Apache 2.0 License. # See the LICENSE file in the project root for more information module Elasticsearch module Transport module Transport # Handles node discovery ("sniffing") # class Sniffer PROTOCOL = 'http' attr_reader :transport attr_accessor :timeout # @param transport [Object] A transport instance # def initialize(transport) @transport = transport @timeout = transport.options[:sniffer_timeout] || 1 end # Retrieves the node list from the Elasticsearch's # [_Nodes Info API_](http://www.elasticsearch.org/guide/reference/api/admin-cluster-nodes-info/) # and returns a normalized Array of information suitable for passing to transport. # # Shuffles the collection before returning it when the `randomize_hosts` option is set for transport. # # @return [Array] # @raise [SnifferTimeoutError] # def hosts Timeout::timeout(timeout, SnifferTimeoutError) do nodes = transport.perform_request('GET', '_nodes/http', {}, nil, nil, reload_on_failure: false).body hosts = nodes['nodes'].map do |id, info| if info[PROTOCOL] host, port = parse_publish_address(info[PROTOCOL]['publish_address']) { :id => id, :name => info['name'], :version => info['version'], :host => host, :port => port, :roles => info['roles'], :attributes => info['attributes'] } end end.compact hosts.shuffle! if transport.options[:randomize_hosts] hosts end end private def parse_publish_address(publish_address) # publish_address is in the format hostname/ip:port if publish_address =~ /\// parts = publish_address.partition('/') [ parts[0], parse_address_port(parts[2])[1] ] else parse_address_port(publish_address) end end def parse_address_port(publish_address) # address is ipv6 if publish_address =~ /[\[\]]/ if parts = publish_address.match(/\A\[(.+)\](?::(\d+))?\z/) [ parts[1], parts[2] ] end else publish_address.split(':') end end end end end end