lib/bolt/applicator.rb in bolt-1.49.0 vs lib/bolt/applicator.rb in bolt-2.0.0

- old
+ new

@@ -81,108 +81,53 @@ Bolt::Task.new('apply_helpers::query_resources', metadata, [file]) end end - def compile(target, ast, plan_vars) + def compile(target, catalog_input) # This simplified Puppet node object is what .local uses to determine the # certname of the target node = Puppet::Node.from_data_hash('name' => target.name, 'parameters' => { 'clientcert' => target.name }) trusted = Puppet::Context::TrustedInformation.local(node) - facts = @inventory.facts(target).merge('bolt' => true) - - catalog_input = { - code_ast: ast, - modulepath: @modulepath, - pdb_config: @pdb_client.config.to_hash, - hiera_config: @hiera_config, - target: { - name: target.name, - facts: facts, - variables: @inventory.vars(target).merge(plan_vars), - trusted: trusted.to_h - }, - inventory: @inventory.data_hash - } - - bolt_catalog_exe = File.join(libexec, 'bolt_catalog') - old_path = ENV['PATH'] - ENV['PATH'] = "#{RbConfig::CONFIG['bindir']}#{File::PATH_SEPARATOR}#{old_path}" - out, err, stat = Open3.capture3('ruby', bolt_catalog_exe, 'compile', stdin_data: catalog_input.to_json) - ENV['PATH'] = old_path - - # stderr may contain formatted logs from Puppet's logger or other errors. - # Print them in order, but handle them separately. Anything not a formatted log is assumed - # to be an error message. - logs = err.lines.map do |l| - begin - JSON.parse(l) - rescue StandardError - l - end - end - logs.each do |log| - if log.is_a?(String) - @logger.error(log.chomp) - else - log.map { |k, v| [k.to_sym, v] }.each do |level, msg| - bolt_level = Bolt::Util::PuppetLogLevel::MAPPING[level] - @logger.send(bolt_level, "#{target.name}: #{msg.chomp}") - end - end - end - - raise(ApplyError, target.name) unless stat.success? - JSON.parse(out) - end - - def future_compile(target, catalog_input) - # This simplified Puppet node object is what .local uses to determine the - # certname of the target - node = Puppet::Node.from_data_hash('name' => target.name, - 'parameters' => { 'clientcert' => target.name }) - trusted = Puppet::Context::TrustedInformation.local(node) catalog_input[:target] = { name: target.name, facts: @inventory.facts(target).merge('bolt' => true), variables: @inventory.vars(target), trusted: trusted.to_h } - # rubocop:disable Style/GlobalVars - catalog_input[:future] = $future - # rubocop:enable Style/GlobalVars bolt_catalog_exe = File.join(libexec, 'bolt_catalog') old_path = ENV['PATH'] ENV['PATH'] = "#{RbConfig::CONFIG['bindir']}#{File::PATH_SEPARATOR}#{old_path}" out, err, stat = Open3.capture3('ruby', bolt_catalog_exe, 'compile', stdin_data: catalog_input.to_json) ENV['PATH'] = old_path - # stderr may contain formatted logs from Puppet's logger or other errors. - # Print them in order, but handle them separately. Anything not a formatted log is assumed - # to be an error message. - logs = err.lines.map do |l| - begin - JSON.parse(l) - rescue StandardError - l - end + # Any messages logged by Puppet will be on stderr as JSON hashes, so we + # parse those and store them here. Any message on stderr that is not + # properly JSON formatted is assumed to be an error message. If + # compilation was successful, we print the logs as they may include + # important warnings. If compilation failed, we don't print the logs as + # they are likely redundant with the error that caused the failure, which + # will be handled separately. + logs = err.lines.map do |line| + JSON.parse(line) + rescue JSON::ParserError + { 'level' => 'err', 'message' => line } end - logs.each do |log| - if log.is_a?(String) - @logger.error(log.chomp) - else - log.map { |k, v| [k.to_sym, v] }.each do |level, msg| - bolt_level = Bolt::Util::PuppetLogLevel::MAPPING[level] - @logger.send(bolt_level, "#{target.name}: #{msg.chomp}") - end + + result = JSON.parse(out) + if stat.success? + logs.each do |log| + bolt_level = Bolt::Util::PuppetLogLevel::MAPPING[log['level'].to_sym] + message = log['message'].chomp + @logger.send(bolt_level, "#{target.name}: #{message}") end + result + else + raise ApplyError.new(target.name, result['message']) end - - raise(ApplyError, target.name) unless stat.success? - JSON.parse(out) end def validate_hiera_config(hiera_config) if File.exist?(File.path(hiera_config)) data = File.open(File.path(hiera_config), "r:UTF-8") { |f| YAML.safe_load(f.read, [Symbol]) } @@ -230,67 +175,74 @@ end def apply_ast(raw_ast, targets, options, plan_vars = {}) ast = Puppet::Pops::Serialization::ToDataConverter.convert(raw_ast, rich_data: true, symbol_to_string: true) - # rubocop:disable Style/GlobalVars - if $future - # Serialize as pcore for *Result* objects - plan_vars = Puppet::Pops::Serialization::ToDataConverter.convert(plan_vars, - rich_data: true, - symbol_as_string: true, - type_by_reference: true, - local_reference: false) - scope = { - code_ast: ast, - modulepath: @modulepath, - pdb_config: @pdb_client.config.to_hash, - hiera_config: @hiera_config, - plan_vars: plan_vars, - # This data isn't available on the target config hash - config: @inventory.config.transport_data_get - } - end - # rubocop:enable Style/GlobalVars + # Serialize as pcore for *Result* objects + plan_vars = Puppet::Pops::Serialization::ToDataConverter.convert(plan_vars, + rich_data: true, + symbol_as_string: true, + type_by_reference: true, + local_reference: false) + scope = { + code_ast: ast, + modulepath: @modulepath, + pdb_config: @pdb_client.config.to_hash, + hiera_config: @hiera_config, + plan_vars: plan_vars, + # This data isn't available on the target config hash + config: @inventory.config.transport_data_get + } + description = options[:description] || 'apply catalog' r = @executor.log_action(description, targets) do futures = targets.map do |target| Concurrent::Future.execute(executor: @pool) do @executor.with_node_logging("Compiling manifest block", [target]) do - # rubocop:disable Style/GlobalVars - $future ? future_compile(target, scope) : compile(target, ast, plan_vars) - # rubocop:enable Style/GlobalVars + compile(target, scope) end end end result_promises = targets.zip(futures).flat_map do |target, future| @executor.queue_execute([target]) do |transport, batch| @executor.with_node_logging("Applying manifest block", batch) do catalog = future.value - raise future.reason if future.rejected? + if future.rejected? + batch.map do |batch_target| + # If an unhandled exception occurred, wrap it in an ApplyError + error = if future.reason.is_a?(Bolt::ApplyError) + future.reason + else + Bolt::ApplyError.new(batch_target, future.reason.message) + end + result = Bolt::ApplyResult.new(batch_target, error: error.to_h) + @executor.publish_event(type: :node_result, result: result) + result + end + else + arguments = { + 'catalog' => Puppet::Pops::Types::PSensitiveType::Sensitive.new(catalog), + 'plugins' => Puppet::Pops::Types::PSensitiveType::Sensitive.new(plugins), + 'apply_settings' => @apply_settings, + '_task' => catalog_apply_task.name, + '_noop' => options[:noop] + } - arguments = { - 'catalog' => Puppet::Pops::Types::PSensitiveType::Sensitive.new(catalog), - 'plugins' => Puppet::Pops::Types::PSensitiveType::Sensitive.new(plugins), - 'apply_settings' => @apply_settings, - '_task' => catalog_apply_task.name, - '_noop' => options[:noop] - } - - callback = proc do |event| - if event[:type] == :node_result - event = event.merge(result: ApplyResult.from_task_result(event[:result])) + callback = proc do |event| + if event[:type] == :node_result + event = event.merge(result: ApplyResult.from_task_result(event[:result])) + end + @executor.publish_event(event) end - @executor.publish_event(event) - end - # Respect the run_as default set on the executor - options[:run_as] = @executor.run_as if @executor.run_as && !options.key?(:run_as) + # Respect the run_as default set on the executor + options[:run_as] = @executor.run_as if @executor.run_as && !options.key?(:run_as) - results = transport.batch_task(batch, catalog_apply_task, arguments, options, &callback) - Array(results).map { |result| ApplyResult.from_task_result(result) } + results = transport.batch_task(batch, catalog_apply_task, arguments, options, &callback) + Array(results).map { |result| ApplyResult.from_task_result(result) } + end end end end @executor.await_results(result_promises)