lib/bolt/applicator.rb in bolt-1.49.0 vs lib/bolt/applicator.rb in bolt-2.0.0
- old
+ new
@@ -81,108 +81,53 @@
Bolt::Task.new('apply_helpers::query_resources', metadata, [file])
end
end
- def compile(target, ast, plan_vars)
+ def compile(target, catalog_input)
# This simplified Puppet node object is what .local uses to determine the
# certname of the target
node = Puppet::Node.from_data_hash('name' => target.name,
'parameters' => { 'clientcert' => target.name })
trusted = Puppet::Context::TrustedInformation.local(node)
- facts = @inventory.facts(target).merge('bolt' => true)
-
- catalog_input = {
- code_ast: ast,
- modulepath: @modulepath,
- pdb_config: @pdb_client.config.to_hash,
- hiera_config: @hiera_config,
- target: {
- name: target.name,
- facts: facts,
- variables: @inventory.vars(target).merge(plan_vars),
- trusted: trusted.to_h
- },
- inventory: @inventory.data_hash
- }
-
- bolt_catalog_exe = File.join(libexec, 'bolt_catalog')
- old_path = ENV['PATH']
- ENV['PATH'] = "#{RbConfig::CONFIG['bindir']}#{File::PATH_SEPARATOR}#{old_path}"
- out, err, stat = Open3.capture3('ruby', bolt_catalog_exe, 'compile', stdin_data: catalog_input.to_json)
- ENV['PATH'] = old_path
-
- # stderr may contain formatted logs from Puppet's logger or other errors.
- # Print them in order, but handle them separately. Anything not a formatted log is assumed
- # to be an error message.
- logs = err.lines.map do |l|
- begin
- JSON.parse(l)
- rescue StandardError
- l
- end
- end
- logs.each do |log|
- if log.is_a?(String)
- @logger.error(log.chomp)
- else
- log.map { |k, v| [k.to_sym, v] }.each do |level, msg|
- bolt_level = Bolt::Util::PuppetLogLevel::MAPPING[level]
- @logger.send(bolt_level, "#{target.name}: #{msg.chomp}")
- end
- end
- end
-
- raise(ApplyError, target.name) unless stat.success?
- JSON.parse(out)
- end
-
- def future_compile(target, catalog_input)
- # This simplified Puppet node object is what .local uses to determine the
- # certname of the target
- node = Puppet::Node.from_data_hash('name' => target.name,
- 'parameters' => { 'clientcert' => target.name })
- trusted = Puppet::Context::TrustedInformation.local(node)
catalog_input[:target] = {
name: target.name,
facts: @inventory.facts(target).merge('bolt' => true),
variables: @inventory.vars(target),
trusted: trusted.to_h
}
- # rubocop:disable Style/GlobalVars
- catalog_input[:future] = $future
- # rubocop:enable Style/GlobalVars
bolt_catalog_exe = File.join(libexec, 'bolt_catalog')
old_path = ENV['PATH']
ENV['PATH'] = "#{RbConfig::CONFIG['bindir']}#{File::PATH_SEPARATOR}#{old_path}"
out, err, stat = Open3.capture3('ruby', bolt_catalog_exe, 'compile', stdin_data: catalog_input.to_json)
ENV['PATH'] = old_path
- # stderr may contain formatted logs from Puppet's logger or other errors.
- # Print them in order, but handle them separately. Anything not a formatted log is assumed
- # to be an error message.
- logs = err.lines.map do |l|
- begin
- JSON.parse(l)
- rescue StandardError
- l
- end
+ # Any messages logged by Puppet will be on stderr as JSON hashes, so we
+ # parse those and store them here. Any message on stderr that is not
+ # properly JSON formatted is assumed to be an error message. If
+ # compilation was successful, we print the logs as they may include
+ # important warnings. If compilation failed, we don't print the logs as
+ # they are likely redundant with the error that caused the failure, which
+ # will be handled separately.
+ logs = err.lines.map do |line|
+ JSON.parse(line)
+ rescue JSON::ParserError
+ { 'level' => 'err', 'message' => line }
end
- logs.each do |log|
- if log.is_a?(String)
- @logger.error(log.chomp)
- else
- log.map { |k, v| [k.to_sym, v] }.each do |level, msg|
- bolt_level = Bolt::Util::PuppetLogLevel::MAPPING[level]
- @logger.send(bolt_level, "#{target.name}: #{msg.chomp}")
- end
+
+ result = JSON.parse(out)
+ if stat.success?
+ logs.each do |log|
+ bolt_level = Bolt::Util::PuppetLogLevel::MAPPING[log['level'].to_sym]
+ message = log['message'].chomp
+ @logger.send(bolt_level, "#{target.name}: #{message}")
end
+ result
+ else
+ raise ApplyError.new(target.name, result['message'])
end
-
- raise(ApplyError, target.name) unless stat.success?
- JSON.parse(out)
end
def validate_hiera_config(hiera_config)
if File.exist?(File.path(hiera_config))
data = File.open(File.path(hiera_config), "r:UTF-8") { |f| YAML.safe_load(f.read, [Symbol]) }
@@ -230,67 +175,74 @@
end
def apply_ast(raw_ast, targets, options, plan_vars = {})
ast = Puppet::Pops::Serialization::ToDataConverter.convert(raw_ast, rich_data: true, symbol_to_string: true)
- # rubocop:disable Style/GlobalVars
- if $future
- # Serialize as pcore for *Result* objects
- plan_vars = Puppet::Pops::Serialization::ToDataConverter.convert(plan_vars,
- rich_data: true,
- symbol_as_string: true,
- type_by_reference: true,
- local_reference: false)
- scope = {
- code_ast: ast,
- modulepath: @modulepath,
- pdb_config: @pdb_client.config.to_hash,
- hiera_config: @hiera_config,
- plan_vars: plan_vars,
- # This data isn't available on the target config hash
- config: @inventory.config.transport_data_get
- }
- end
- # rubocop:enable Style/GlobalVars
+ # Serialize as pcore for *Result* objects
+ plan_vars = Puppet::Pops::Serialization::ToDataConverter.convert(plan_vars,
+ rich_data: true,
+ symbol_as_string: true,
+ type_by_reference: true,
+ local_reference: false)
+ scope = {
+ code_ast: ast,
+ modulepath: @modulepath,
+ pdb_config: @pdb_client.config.to_hash,
+ hiera_config: @hiera_config,
+ plan_vars: plan_vars,
+ # This data isn't available on the target config hash
+ config: @inventory.config.transport_data_get
+ }
+
description = options[:description] || 'apply catalog'
r = @executor.log_action(description, targets) do
futures = targets.map do |target|
Concurrent::Future.execute(executor: @pool) do
@executor.with_node_logging("Compiling manifest block", [target]) do
- # rubocop:disable Style/GlobalVars
- $future ? future_compile(target, scope) : compile(target, ast, plan_vars)
- # rubocop:enable Style/GlobalVars
+ compile(target, scope)
end
end
end
result_promises = targets.zip(futures).flat_map do |target, future|
@executor.queue_execute([target]) do |transport, batch|
@executor.with_node_logging("Applying manifest block", batch) do
catalog = future.value
- raise future.reason if future.rejected?
+ if future.rejected?
+ batch.map do |batch_target|
+ # If an unhandled exception occurred, wrap it in an ApplyError
+ error = if future.reason.is_a?(Bolt::ApplyError)
+ future.reason
+ else
+ Bolt::ApplyError.new(batch_target, future.reason.message)
+ end
+ result = Bolt::ApplyResult.new(batch_target, error: error.to_h)
+ @executor.publish_event(type: :node_result, result: result)
+ result
+ end
+ else
+ arguments = {
+ 'catalog' => Puppet::Pops::Types::PSensitiveType::Sensitive.new(catalog),
+ 'plugins' => Puppet::Pops::Types::PSensitiveType::Sensitive.new(plugins),
+ 'apply_settings' => @apply_settings,
+ '_task' => catalog_apply_task.name,
+ '_noop' => options[:noop]
+ }
- arguments = {
- 'catalog' => Puppet::Pops::Types::PSensitiveType::Sensitive.new(catalog),
- 'plugins' => Puppet::Pops::Types::PSensitiveType::Sensitive.new(plugins),
- 'apply_settings' => @apply_settings,
- '_task' => catalog_apply_task.name,
- '_noop' => options[:noop]
- }
-
- callback = proc do |event|
- if event[:type] == :node_result
- event = event.merge(result: ApplyResult.from_task_result(event[:result]))
+ callback = proc do |event|
+ if event[:type] == :node_result
+ event = event.merge(result: ApplyResult.from_task_result(event[:result]))
+ end
+ @executor.publish_event(event)
end
- @executor.publish_event(event)
- end
- # Respect the run_as default set on the executor
- options[:run_as] = @executor.run_as if @executor.run_as && !options.key?(:run_as)
+ # Respect the run_as default set on the executor
+ options[:run_as] = @executor.run_as if @executor.run_as && !options.key?(:run_as)
- results = transport.batch_task(batch, catalog_apply_task, arguments, options, &callback)
- Array(results).map { |result| ApplyResult.from_task_result(result) }
+ results = transport.batch_task(batch, catalog_apply_task, arguments, options, &callback)
+ Array(results).map { |result| ApplyResult.from_task_result(result) }
+ end
end
end
end
@executor.await_results(result_promises)