require "net/http" require "resolv" module MCollective module Util class Choria class UserError < StandardError; end class Abort < StandardError; end unless defined?(Choria::VERSION) # rubocop:disable Style/IfUnlessModifier VERSION = "0.19.0".freeze end attr_writer :ca def initialize(check_ssl=true) @config = Config.instance check_ssl_setup if check_ssl end # Determines the configured path to the NATS credentials, empty when not set # # @return [String] def credential_file get_option("nats.credentials", "") end # Determines if a credential file is configured # # @return [Boolean] def credential_file? credential_file != "" end # Determines if we are connecting to NGS based on credentials and the nats.ngs setting # # @return [Boolean] def ngs? credential_file != "" && Util.str_to_bool(get_option("nats.ngs", "false")) end # Attempts to load the optional nkeys library # # @return [Boolean] def nkeys? require "nkeys" true rescue LoadError false end # Creates a new TasksSupport instance with the configured cache dir # # @return [TasksSupport] def tasks_support require_relative "tasks_support" Util::TasksSupport.new(self, tasks_cache_dir) end # Determines the Tasks Cache dir # # @return [String] path to the cache def tasks_cache_dir if Util.windows? File.join(Util.windows_prefix, "tasks-cache") elsif Process.uid == 0 "/opt/puppetlabs/mcollective/tasks-cache" else File.expand_path("~/.puppetlabs/mcollective/tasks-cache") end end # Determines the Tasks Spool directory # # @return [String] path to the spool def tasks_spool_dir if Util.windows? File.join(Util.windows_prefix, "tasks-spool") elsif Process.uid == 0 "/opt/puppetlabs/mcollective/tasks-spool" else File.expand_path("~/.puppetlabs/mcollective/tasks-spool") end end # Determines if there are any federations configured # # @return [Boolean] def federated? !federation_collectives.empty? end # List of active collectives that form the federation # # @return [Array] def federation_collectives if !@config.federations.empty? @config.federations elsif (override_networks = env_fetch("CHORIA_FED_COLLECTIVE", nil)) override_networks.split(",").map(&:strip).reject(&:empty?) else get_option("choria.federation.collectives", "").split(",").map(&:strip).reject(&:empty?) end end # Retrieves a DNS resolver # # @note mainly used for testing # @return [Resolv::DNS] def resolver Resolv::DNS.new end # Retrieves the domain from facter networking.domain if facter is found # # Potentially we could use the local facts in mcollective but that's a chicken # and egg and sometimes its only set after initial connection if something like # a cron job generates the yaml cache file # # @return [String,nil] def facter_domain if path = facter_cmd `"#{path}" networking.domain`.chomp end end # Determines the domain to do SRV lookups in # # This is settable using the environment variable # CHORIA_SRV_DOMAIN or choria.srv_domain and defaults # to the domain as reported by facter # # @return [String] def srv_domain env_fetch("CHORIA_SRV_DOMAIN", nil) || get_option("choria.srv_domain", nil) || facter_domain end # Determines the SRV records to look up # # If an option choria.srv_domain is set that will be used else facter will be consulted, # if neither of those provide a domain name a empty list is returned # # @param keys [Array] list of keys to lookup # @return [Array] list of SRV records def srv_records(keys) domain = srv_domain if domain.nil? || domain.empty? Log.warn("Cannot look up SRV records, facter is not functional and choria.srv_domain was not supplied") return [] end keys.map do |key| "%s.%s" % [key, domain] end end # Determines if SRV records should be used # # Setting choria.use_srv to anything other than t, true, yes or 1 will disable # SRV records # # @return [Boolean] def should_use_srv? ["t", "true", "yes", "1"].include?(get_option("choria.use_srv", "1").downcase) end # Query DNS for a series of records # # The given records will be passed through {#srv_records} to figure out the domain to query in. # # Querying of records can be bypassed by setting choria.use_srv to false # # @yield [Hash] each record for modification by the caller # @param records [Array] the records to query without their domain parts # @return [Array] with keys :port, :priority, :weight and :target def query_srv_records(records) unless should_use_srv? Log.info("Skipping SRV record queries due to choria.query_srv_records setting") return [] end answers = Array(srv_records(records)).map do |record| Log.debug("Attempting to resolve SRV record %s" % record) answers = resolver.getresources(record, Resolv::DNS::Resource::IN::SRV) Log.debug("Found %d SRV records for %s" % [answers.size, record]) answers end.flatten answers = answers.sort_by(&:priority).chunk(&:priority).sort answers = sort_srv_answers(answers) answers.map do |result| Log.debug("Found %s:%s with priority %s and weight %s" % [result.target, result.port, result.priority, result.weight]) ans = { :port => result.port, :priority => result.priority, :weight => result.weight, :target => result.target } yield(ans) if block_given? ans end end # Sorts SRV records according to rfc2782 # # @note this is probably still not correct :( so horrible # @param answers [Array] # @return [Array] sorted records def sort_srv_answers(answers) sorted_answers = [] # this is roughly based on the resolv-srv and supposedly mostly rfc2782 compliant answers.each do |_, available| total_weight = available.inject(0) {|a, e| a + e.weight + 1 } until available.empty? selector = Integer(rand * total_weight) + 1 selected_idx = available.find_index do |e| selector -= e.weight + 1 selector <= 0 end selected = available.delete_at(selected_idx) total_weight -= selected.weight + 1 sorted_answers << selected end end sorted_answers end # Create a Net::HTTP instance optionally set up with the Puppet certs # # If the client_private_key and client_public_cert both exist they will # be used to validate the connection # # If the ca_path exist it will be used and full verification will be enabled # # @param server [Hash] as returned by {#try_srv} # @param force_puppet_ssl [boolean] when true will call {#check_ssl_setup} and so force Puppet certs # @return [Net::HTTP] def https(server, force_puppet_ssl=false) Log.debug("Creating new HTTPS connection to %s:%s" % [server[:target], server[:port]]) check_ssl_setup if force_puppet_ssl http = Net::HTTP.new(server[:target], server[:port]) http.use_ssl = true if has_client_private_key? && has_client_public_cert? http.cert = OpenSSL::X509::Certificate.new(File.read(client_public_cert)) http.key = OpenSSL::PKey::RSA.new(File.read(client_private_key)) end if has_ca? http.ca_file = ca_path http.verify_mode = OpenSSL::SSL::VERIFY_PEER else http.verify_mode = OpenSSL::SSL::VERIFY_NONE end http end # Creates a Net::HTTP::Get instance for a path that defaults to accepting JSON # # @param path [String] # @return [Net::HTTP::Get] def http_get(path, headers=nil) headers ||= {} headers = { "Accept" => "application/json", "User-Agent" => "Choria version %s http://choria.io" % VERSION }.merge(headers) Net::HTTP::Get.new(path, headers) end # Creates a Net::HTTP::Post instance for a path that defaults to accepting JSON # # @param path [String] # @return [Net::HTTP::Post] def http_post(path, headers=nil) headers ||= {} headers = { "Accept" => "application/json", "User-Agent" => "Choria version %s http://choria.io" % VERSION }.merge(headers) Net::HTTP::Post.new(path, headers) end # Does a proxied discovery request # # @param query [Hash] Discovery query as per pdbproxy standard # @return [Array] JSON parsed result set # @raise [StandardError] on any failures def proxy_discovery_query(query) transport = https(discovery_server, true) request = http_get("/v1/discover") request.body = query.to_json request["Content-Type"] = "application/json" resp, data = transport.request(request) raise("Failed to make request to Discovery Proxy: %s: %s" % [resp.code, resp.body]) unless resp.code == "200" result = JSON.parse(data || resp.body) result["nodes"] end # Extract certnames from PQL results, deactivated nodes are ignored # # @param results [Array] # @return [Array] list of certnames def pql_extract_certnames(results) results.reject {|n| n["deactivated"]}.map {|n| n["certname"]}.compact end # Performs a PQL query against the configured PuppetDB # # @param query [String] PQL Query # @param only_certnames [Boolean] extract certnames from the results # @return [Array] JSON parsed result set # @raise [StandardError] on any failures def pql_query(query, only_certnames=false) Log.debug("Performing PQL query: %s" % query) path = "/pdb/query/v4?%s" % URI.encode_www_form("query" => query) resp, data = https(puppetdb_server, true).request(http_get(path)) raise("Failed to make request to PuppetDB: %s: %s: %s" % [resp.code, resp.message, resp.body]) unless resp.code == "200" result = JSON.parse(data || resp.body) Log.debug("Found %d records for query %s" % [result.size, query]) only_certnames ? pql_extract_certnames(result) : result end # Checks if all the required SSL files exist # # @param log [Boolean] log warnings when true # @return [Boolean] def have_ssl_files?(log=true) [client_public_cert, client_private_key, ca_path].map do |path| Log.debug("Checking for SSL file %s" % path) if File.exist?(path) true else Log.warn("Cannot find SSL file %s" % path) if log false end end.all? end # Validates a certificate against the CA # # @param pubcert [String] PEM encoded X509 public certificate # @param name [String] name that should be present in the certificate # @param log [Boolean] log warnings when true # @return [String,false] when succesful, the certname else false # @raise [StandardError] in case OpenSSL fails to open the various certificates # @raise [OpenSSL::X509::CertificateError] if the CA is invalid def valid_certificate?(pubcert, name, log=true) return false unless name raise("Cannot find or read the CA in %s, cannot verify public certificate" % ca_path) unless File.readable?(ca_path) certs = parse_pubcert(pubcert, log) return false if certs.empty? incoming = certs.first chain = certs[1..-1] begin ca = OpenSSL::X509::Store.new.add_file(ca_path) rescue OpenSSL::X509::StoreError Log.warn("Failed to load CA from %s: %s: %s" % [ca_path, $!.class, $!.to_s]) if log raise end unless ca.verify(incoming, chain) Log.warn("Failed to verify certificate %s against CA %s in %s" % [incoming.subject.to_s, incoming.issuer.to_s, ca_path]) if log return false end Log.debug("Verified certificate %s against CA %s" % [incoming.subject.to_s, incoming.issuer.to_s]) if log if !remote_signer_configured? && !OpenSSL::SSL.verify_certificate_identity(incoming, name) raise("Could not parse certificate with subject %s as it has no CN part, or name %s invalid" % [incoming.subject.to_s, name]) end name end # Determines if a remote signer is configured # # @return [Boolean] def remote_signer_configured? url = get_option("choria.security.request_signer.url", nil) ![nil, ""].include?(url) end # Utility function to split a chained certificate String into an Array # # @param pemdata [String] PEM encoded certificate # @return [Array] def ssl_split_pem(pemdata) # Chained certificates typically have the public certificate, along # with every intermediate certificiate. # OpenSSL will stop at the first certificate when using OpenSSL::X509::Certificate.new, # so we need to separate them into a list pemdata.scan(/-----BEGIN CERTIFICATE-----.+?-----END CERTIFICATE-----/m) end # Split a string containing chained certificates into an Array of OpenSSL::X509::Certificate. # # @param pemdata [String] # @return [Array] def ssl_parse_chain(pemdata) ssl_split_pem(pemdata).map do |cpem| OpenSSL::X509::Certificate.new(cpem) end end # Parses a public cert # # @param pubcert [String] PEM encoded public certificate # @param log [Boolean] log warnings when true # @return [Array] def parse_pubcert(pubcert, log=true) ssl_parse_chain(pubcert) rescue OpenSSL::X509::CertificateError Log.warn("Received certificate is not a valid x509 certificate: %s: %s" % [$!.class, $!.to_s]) if log nil end # The callerid for the current client # # @return [String] # @raise [Exception] when remote JWT is invalid def callerid PluginManager["security_plugin"].callerid end # Checks all the required SSL files exist # # @param log [Boolean] log warnings when true # @return [Boolean] # @raise [StandardError] on failure def check_ssl_setup(log=true) return true if $choria_unsafe_disable_protocol_security # rubocop:disable Style/GlobalVars return true if anon_tls? raise(UserError, "The Choria client cannot be run as root") if Process.uid == 0 && PluginManager["security_plugin"].initiated_by == :client raise(UserError, "Not all required SSL files exist") unless have_ssl_files?(log) embedded_certname = nil begin embedded_certname = valid_certificate?(File.read(client_public_cert), certname) rescue raise(UserError, "The public certificate was not signed by the configured CA") end unless embedded_certname == certname raise(UserError, "The certname %s found in %s does not match the configured certname of %s" % [embedded_certname, client_public_cert, certname]) end true end # Resolves server lists based on config and SRV records # # Attempts to find server in the following order: # # * Configured hosts in `config_option` # * SRV lookups of `srv_records` # * Defaults # * nil otherwise # # @param config_option [String] config to lookup # @param srv_records [Array] list of SRV records to query # @param default_host [String] host to use when not found # @param default_port [String] port to use when not found # @return [Array, nil] groups of host and port pairs def server_resolver(config_option, srv_records, default_host=nil, default_port=nil) if servers = get_option(config_option, nil) hosts = servers.split(",").map do |server| server.split(":").map(&:strip) end return hosts end srv_answers = query_srv_records(srv_records) unless srv_answers.empty? hosts = srv_answers.map do |answer| [answer[:target], answer[:port]] end return hosts end [[default_host, default_port]] if default_host && default_port end # Finds the middleware hosts in config or DNS # # Attempts to find servers in the following order: # # * connects.ngs.global if configured to be ngs and empty choria.middleware_hosts # * Any federation servers if in a federation # * Configured hosts in choria.middleware_hosts # * SRV lookups in _mcollective-server._tcp and _x-puppet-mcollective._tcp # * Supplied defaults # # Eventually it's intended that other middleware might be supported # this would provide a single way to configure them all # # @param default_host [String] default hostname # @param default_port [String] default port # @return [Array>] groups of host and port def middleware_servers(default_host="puppet", default_port="4222") return [["connect.ngs.global", "4222"]] if ngs? && !has_option?("choria.middleware_hosts") if federated? && federation = federation_middleware_servers return federation end server_resolver("choria.middleware_hosts", ["_mcollective-server._tcp", "_x-puppet-mcollective._tcp"], default_host, default_port) end # Looks for federation middleware servers when federated # # Attempts to find servers in the following order: # # * Configured hosts in choria.federation_middleware_hosts # * SRV lookups in _mcollective-federation_server._tcp and _x-puppet-mcollective_federation._tcp # # @note you'd still want to only get your middleware servers from {#middleware_servers} # @return [Array,nil] groups of host and port, nil when not found def federation_middleware_servers server_resolver("choria.federation_middleware_hosts", ["_mcollective-federation_server._tcp", "_x-puppet-mcollective_federation._tcp"]) end # Determines if servers should be randomized # # @return [Boolean] def randomize_middleware_servers? Util.str_to_bool(get_option("choria.randomize_middleware_hosts", "true")) end # Attempts to look up some SRV records falling back to defaults # # When given a array of multiple names it will try each name individually # and check if it resolved to a answer, if it did it will use that one. # Else it will move to the next. In this way you can prioritise one # record over another like puppetdb over puppet and faill back to defaults. # # This is a pretty naive implementation that right now just returns # the first result, the correct behaviour needs to be determined but # for now this gets us going with easily iterable code. # # These names are mainly being used by {#https} so in theory it would # be quite easy to support multiple results with fall back etc, but # I am not really sure what would be the best behaviour here # # @param names [Array, String] list of names to lookup without the domain # @param default_target [String] default for the returned :target # @param default_port [String] default for the returned :port # @return [Hash] with :target and :port def try_srv(names, default_target, default_port) srv_answers = Array(names).map do |name| answer = query_srv_records([name]) answer.empty? ? nil : answer end.compact.flatten if srv_answers.empty? {:target => default_target, :port => default_port} else {:target => srv_answers[0][:target].to_s, :port => srv_answers[0][:port]} end end # The Puppet server to connect to # # Will consult SRV records for _x-puppet._tcp.example.net first then # configurable using choria.puppetserver_host and choria.puppetserver_port # defaults to puppet:8140. # # @return [Hash] with :target and :port def puppet_server d_host = get_option("choria.puppetserver_host", "puppet") d_port = get_option("choria.puppetserver_port", "8140") try_srv(["_x-puppet._tcp"], d_host, d_port) end # The Puppet server to connect to # # Will consult _x-puppet-ca._tcp.example.net then _x-puppet._tcp.example.net # then configurable using choria.puppetca_host, defaults to puppet:8140 # # @return [Hash] with :target and :port def puppetca_server d_port = get_option("choria.puppetca_port", "8140") if @ca {:target => @ca, :port => d_port} else d_host = get_option("choria.puppetca_host", "puppet") try_srv(["_x-puppet-ca._tcp", "_x-puppet._tcp"], d_host, d_port) end end # The PuppetDB server to connect to # # Use choria.puppetdb_host if set, otherwise query # _x-puppet-db._tcp.example.net then _x-puppet._tcp.example.net if SRV # lookup is enabled, and fallback to puppet:8081 if nothing else worked. # # @return [Hash] with :target and :port def puppetdb_server d_port = get_option("choria.puppetdb_port", "8081") answer = { :target => get_option("choria.puppetdb_host", nil), :port => d_port } return answer if answer[:target] answer = try_srv(["_x-puppet-db._tcp"], nil, nil) return answer if answer[:target] # In the case where we take _x-puppet._tcp SRV records we unfortunately have # to force the port else it uses the one from Puppet which will 404 answer = try_srv(["_x-puppet._tcp"], "puppet", d_port) answer[:port] = d_port answer end # Looks for discovery proxy servers # # Attempts to find servers in the following order: # # * If choria.discovery_proxy is set to false, returns nil # * Configured hosts in choria.discovery_proxies # * SRV lookups in _mcollective-discovery._tcp # # @return [Hash] with :target and :port def discovery_server return unless proxied_discovery? d_host = get_option("choria.discovery_host", "puppet") d_port = get_option("choria.discovery_port", "8085") try_srv(["_mcollective-discovery._tcp"], d_host, d_port) end # Determines if this is using a discovery proxy # # @return [Boolean] def proxied_discovery? has_option?("choria.discovery_host") || has_option?("choria.discovery_port") || Util.str_to_bool(get_option("choria.discovery_proxy", "false")) end # The certname of the current context # # In the case of root that would be the configured `identity` # for non root it would a string made up of the current username # as determined by the USER environment variable or the configured # `identity` # # At present windows clients are probably not supported automatically # as they will default to the certificate based on identity. Same # as root. Windows will have to rely on the environment override # until we can figure out what the best behaviour is # # In all cases the certname can be overridden using the `MCOLLECTIVE_CERTNAME` # environment variable # # @return [String] def certname if Process.uid == 0 || Util.windows? certname = @config.identity else certname = "%s.mcollective" % [env_fetch("USER", @config.identity)] end env_fetch("MCOLLECTIVE_CERTNAME", certname) end # Initialises Puppet if needed and retrieve a config setting # # @param setting [Symbol] a Puppet setting name # @return [String] def puppet_setting(setting) require "puppet" unless Puppet.settings.app_defaults_initialized? Puppet.settings.preferred_run_mode = :agent Puppet.settings.initialize_global_settings([]) Puppet.settings.initialize_app_defaults(Puppet::Settings.app_defaults_for_run_mode(Puppet.run_mode)) Puppet.push_context(Puppet.base_context(Puppet.settings)) end Puppet.settings[setting] end # Creates a SSL Context which includes the AIO SSL files # # @return [OpenSSL::SSL::SSLContext] def ssl_context context = OpenSSL::SSL::SSLContext.new context.ca_file = ca_path context.ssl_version = :TLSv1_2 # rubocop:disable Naming/VariableNumber if anon_tls? context.verify_mode = OpenSSL::SSL::VERIFY_NONE return context end public_cert = File.read(client_public_cert) private_key = File.read(client_private_key) cert_chain = ssl_parse_chain(public_cert) cert = cert_chain.first key = OpenSSL::PKey::RSA.new(private_key) extra_chain_cert = cert_chain[1..-1] if OpenSSL::SSL::SSLContext.method_defined?(:add_certificate) context.add_certificate(cert, key, extra_chain_cert) else context.cert = OpenSSL::X509::Certificate.new(File.read(client_public_cert)) context.key = OpenSSL::PKey::RSA.new(File.read(client_private_key)) context.extra_chain_cert = extra_chain_cert end context.verify_mode = OpenSSL::SSL::VERIFY_PEER context end # The directory where SSL related files live # # This is configurable using choria.ssldir which should be a # path expandable using File.expand_path # # On Windows or when running as root Puppet settings will be consulted # but when running as a normal user it will default to the AIO path # when not configured # # @return [String] def ssl_dir @_ssl_dir ||= if has_option?("choria.ssldir") File.expand_path(get_option("choria.ssldir")) elsif Util.windows? || Process.uid == 0 puppet_setting(:ssldir) else File.expand_path("~/.puppetlabs/etc/puppet/ssl") end end # Determines the security provider def security_provider get_option("security.provider", "puppet") end # Determines if the file security provider is enabled def file_security? security_provider == "file" end # Determines if the puppet security provider is enabled def puppet_security? security_provider == "puppet" end # Expands full paths with special handling for empty string # # File.expand_path will expand `""` to cwd, this is not good for # what we need in many cases so this returns `""` in that case # # @param path [String] the unexpanded path # @return [String] `""` when empty string was given def expand_path(path) return "" if path == "" File.expand_path(path) end # The path to a client public certificate # # @note paths determined by Puppet AIO packages # @return [String] def client_public_cert return expand_path(get_option("security.file.certificate", "")) if file_security? File.join(ssl_dir, "certs", "%s.pem" % certname) end # Determines if teh client_public_cert exist # # @return [Boolean] def has_client_public_cert? File.exist?(client_public_cert) end # The path to a client private key # # @note paths determined by Puppet AIO packages # @return [String] def client_private_key return expand_path(get_option("security.file.key", "")) if file_security? File.join(ssl_dir, "private_keys", "%s.pem" % certname) end # Determines if the client_private_key exist # # @return [Boolean] def has_client_private_key? File.exist?(client_private_key) end # The path to the CA # # @return [String] def ca_path return expand_path(get_option("security.file.ca", "")) if file_security? File.join(ssl_dir, "certs", "ca.pem") end # Determines if Choria is configured for anonymous TLS mode # # @return [Boolean] def anon_tls? remote_signer_configured? && Util.str_to_bool(get_option("security.client_anon_tls", "false")) end # Determines if the CA exist # # @return [Boolean] def has_ca? File.exist?(ca_path) end # The path to a CSR for this user # # @return [String] def csr_path return "" if file_security? File.join(ssl_dir, "certificate_requests", "%s.pem" % certname) end # Determines if the CSR exist # # @return [Boolean] def has_csr? File.exist?(csr_path) end # Searches the PATH for an executable command # # @param command [String] a command to search for # @return [String,nil] the path to the command or nil def which(command) exts = Array(env_fetch("PATHEXT", "").split(";")) exts << "" if exts.empty? env_fetch("PATH", "").split(File::PATH_SEPARATOR).each do |path| exts.each do |ext| exe = File.join(path, "%s%s" % [command, ext]) return exe if File.executable?(exe) && !File.directory?(exe) end end nil end # Searches the machine for a working facter # # It checks AIO path first and then attempts to find it in PATH and supports both # unix and windows # # @return [String,nil] def facter_cmd return "/opt/puppetlabs/bin/facter" if File.executable?("/opt/puppetlabs/bin/facter") which("facter") end # Gets a config option # # @param opt [String] config option to look up # @param default [Object] default to return when not found # @return [Object, Proc] the found data or default. When it's a proc the proc will be called only when needed # @raise [StandardError] when no default is given and option is not found def get_option(opt, default=:_unset) return @config.pluginconf[opt] if has_option?(opt) unless default == :_unset if default.is_a?(Proc) return default.call else return default end end raise(UserError, "No plugin.%s configuration option given" % opt) end # Determines if a config option is set # # @param opt [String] config option to look up # @return [Boolean] def has_option?(opt) @config.pluginconf.include?(opt) end def env_fetch(key, default=nil) ENV.fetch(key, default) end end end end