lib/bolt/executor.rb in bolt-0.18.1 vs lib/bolt/executor.rb in bolt-0.18.2
- old
+ new
@@ -17,20 +17,16 @@
attr_accessor :run_as
def initialize(config = Bolt::Config.new, noop = nil, plan_logging = false)
@config = config
@logger = Logging.logger[self]
+ @plan_logging = plan_logging
@transports = Bolt::TRANSPORTS.each_with_object({}) do |(key, val), coll|
coll[key.to_s] = Concurrent::Delay.new { val.new }
end
- # If a specific elevated log level has been requested, honor that.
- # Otherwise, escalate the log level to "info" if running in plan mode, so
- # that certain progress messages will be visible.
- default_log_level = plan_logging ? :info : :notice
- @logger.level = @config[:log_level] || default_log_level
@noop = noop
@run_as = nil
@pool = Concurrent::CachedThreadPool.new(max_threads: @config[:concurrency])
@logger.debug { "Started with #{@config[:concurrency]} max thread(s)" }
@notifier = Bolt::Notifier.new
@@ -41,15 +37,15 @@
# If there was an error creating the transport, ensure it gets thrown
impl.no_error!
impl.value
end
- def summary(action, object, result)
+ def summary(description, result)
fc = result.error_set.length
npl = result.length == 1 ? '' : 's'
fpl = fc == 1 ? '' : 's'
- "Ran #{action} '#{object}' on #{result.length} node#{npl} with #{fc} failure#{fpl}"
+ "Finished: #{description} on #{result.length} node#{npl} with #{fc} failure#{fpl}"
end
private :summary
# Execute the given block on a list of nodes in parallel, one thread per "batch".
#
@@ -95,77 +91,90 @@
end
end
ResultSet.new(promises.map(&:value))
end
+ # When running a plan, info messages like starting a task are promoted to notice.
+ def log_action(msg)
+ @plan_logging ? @logger.notice(msg) : @logger.info(msg)
+ end
+ private :log_action
+
+ def with_node_logging(description, batch)
+ @logger.info("#{description} on #{batch.map(&:uri)}")
+ result = yield
+ @logger.info(result.to_json)
+ result
+ end
+ private :with_node_logging
+
def run_command(targets, command, options = {}, &callback)
- @logger.info("Starting command run '#{command}' on #{targets.map(&:uri)}")
+ description = options.fetch('_description', "command '#{command}'")
+ log_action("Starting: #{description} on #{targets.map(&:uri)}")
notify = proc { |event| @notifier.notify(callback, event) if callback }
options = { '_run_as' => run_as }.merge(options) if run_as
results = batch_execute(targets) do |transport, batch|
- transport.batch_command(batch, command, options, ¬ify)
+ with_node_logging("Running command '#{command}'", batch) do
+ transport.batch_command(batch, command, options, ¬ify)
+ end
end
- @logger.info(summary('command', command, results))
+ log_action(summary(description, results))
@notifier.shutdown
results
end
def run_script(targets, script, arguments, options = {}, &callback)
- @logger.info("Starting script run #{script} on #{targets.map(&:uri)}")
- @logger.debug("Arguments: #{arguments}")
+ description = options.fetch('_description', "script #{script}")
+ log_action("Starting: #{description} on #{targets.map(&:uri)}")
+
notify = proc { |event| @notifier.notify(callback, event) if callback }
options = { '_run_as' => run_as }.merge(options) if run_as
results = batch_execute(targets) do |transport, batch|
- transport.batch_script(batch, script, arguments, options, ¬ify)
+ with_node_logging("Running script #{script} with '#{arguments}'", batch) do
+ transport.batch_script(batch, script, arguments, options, ¬ify)
+ end
end
- @logger.info(summary('script', script, results))
+ log_action(summary(description, results))
@notifier.shutdown
results
end
def run_task(targets, task, arguments, options = {}, &callback)
- task_name = task.name
- @logger.info("Starting task #{task_name} on #{targets.map(&:uri)}")
- @logger.debug("Arguments: #{arguments} Input method: #{task.input_method}")
+ description = options.fetch('_description', "task #{task.name}")
+ log_action("Starting: #{description} on #{targets.map(&:uri)}")
+
notify = proc { |event| @notifier.notify(callback, event) if callback }
options = { '_run_as' => run_as }.merge(options) if run_as
results = batch_execute(targets) do |transport, batch|
- transport.batch_task(batch, task, arguments, options, ¬ify)
+ with_node_logging("Running task #{task.name} with '#{arguments}' via #{task.input_method}", batch) do
+ transport.batch_task(batch, task, arguments, options, ¬ify)
+ end
end
- @logger.info(summary('task', task_name, results))
+ log_action(summary(description, results))
@notifier.shutdown
results
end
def file_upload(targets, source, destination, options = {}, &callback)
- @logger.info("Starting file upload from #{source} to #{destination} on #{targets.map(&:uri)}")
+ description = options.fetch('_description', "file upload from #{source} to #{destination}")
+ log_action("Starting: #{description} on #{targets.map(&:uri)}")
notify = proc { |event| @notifier.notify(callback, event) if callback }
options = { '_run_as' => run_as }.merge(options) if run_as
results = batch_execute(targets) do |transport, batch|
- transport.batch_upload(batch, source, destination, options, ¬ify)
+ with_node_logging("Uploading file #{source} to #{destination}", batch) do
+ transport.batch_upload(batch, source, destination, options, ¬ify)
+ end
end
- @logger.info(summary('upload', source, results))
+ log_action(summary(description, results))
@notifier.shutdown
results
- end
-
- def puppetdb_client
- return @puppetdb_client if @puppetdb_client
- puppetdb_config = Bolt::PuppetDB::Config.new(nil, @config.puppetdb)
- @puppetdb_client = Bolt::PuppetDB::Client.from_config(puppetdb_config)
- end
-
- def puppetdb_fact(certnames)
- puppetdb_client.facts_for_node(certnames)
- rescue StandardError => e
- raise Bolt::CLIError, "Could not retrieve targets from PuppetDB: #{e}"
end
end
end