lib/bolt/executor.rb in bolt-0.18.1 vs lib/bolt/executor.rb in bolt-0.18.2

- old
+ new

@@ -17,20 +17,16 @@ attr_accessor :run_as def initialize(config = Bolt::Config.new, noop = nil, plan_logging = false) @config = config @logger = Logging.logger[self] + @plan_logging = plan_logging @transports = Bolt::TRANSPORTS.each_with_object({}) do |(key, val), coll| coll[key.to_s] = Concurrent::Delay.new { val.new } end - # If a specific elevated log level has been requested, honor that. - # Otherwise, escalate the log level to "info" if running in plan mode, so - # that certain progress messages will be visible. - default_log_level = plan_logging ? :info : :notice - @logger.level = @config[:log_level] || default_log_level @noop = noop @run_as = nil @pool = Concurrent::CachedThreadPool.new(max_threads: @config[:concurrency]) @logger.debug { "Started with #{@config[:concurrency]} max thread(s)" } @notifier = Bolt::Notifier.new @@ -41,15 +37,15 @@ # If there was an error creating the transport, ensure it gets thrown impl.no_error! impl.value end - def summary(action, object, result) + def summary(description, result) fc = result.error_set.length npl = result.length == 1 ? '' : 's' fpl = fc == 1 ? '' : 's' - "Ran #{action} '#{object}' on #{result.length} node#{npl} with #{fc} failure#{fpl}" + "Finished: #{description} on #{result.length} node#{npl} with #{fc} failure#{fpl}" end private :summary # Execute the given block on a list of nodes in parallel, one thread per "batch". # @@ -95,77 +91,90 @@ end end ResultSet.new(promises.map(&:value)) end + # When running a plan, info messages like starting a task are promoted to notice. + def log_action(msg) + @plan_logging ? @logger.notice(msg) : @logger.info(msg) + end + private :log_action + + def with_node_logging(description, batch) + @logger.info("#{description} on #{batch.map(&:uri)}") + result = yield + @logger.info(result.to_json) + result + end + private :with_node_logging + def run_command(targets, command, options = {}, &callback) - @logger.info("Starting command run '#{command}' on #{targets.map(&:uri)}") + description = options.fetch('_description', "command '#{command}'") + log_action("Starting: #{description} on #{targets.map(&:uri)}") notify = proc { |event| @notifier.notify(callback, event) if callback } options = { '_run_as' => run_as }.merge(options) if run_as results = batch_execute(targets) do |transport, batch| - transport.batch_command(batch, command, options, &notify) + with_node_logging("Running command '#{command}'", batch) do + transport.batch_command(batch, command, options, &notify) + end end - @logger.info(summary('command', command, results)) + log_action(summary(description, results)) @notifier.shutdown results end def run_script(targets, script, arguments, options = {}, &callback) - @logger.info("Starting script run #{script} on #{targets.map(&:uri)}") - @logger.debug("Arguments: #{arguments}") + description = options.fetch('_description', "script #{script}") + log_action("Starting: #{description} on #{targets.map(&:uri)}") + notify = proc { |event| @notifier.notify(callback, event) if callback } options = { '_run_as' => run_as }.merge(options) if run_as results = batch_execute(targets) do |transport, batch| - transport.batch_script(batch, script, arguments, options, &notify) + with_node_logging("Running script #{script} with '#{arguments}'", batch) do + transport.batch_script(batch, script, arguments, options, &notify) + end end - @logger.info(summary('script', script, results)) + log_action(summary(description, results)) @notifier.shutdown results end def run_task(targets, task, arguments, options = {}, &callback) - task_name = task.name - @logger.info("Starting task #{task_name} on #{targets.map(&:uri)}") - @logger.debug("Arguments: #{arguments} Input method: #{task.input_method}") + description = options.fetch('_description', "task #{task.name}") + log_action("Starting: #{description} on #{targets.map(&:uri)}") + notify = proc { |event| @notifier.notify(callback, event) if callback } options = { '_run_as' => run_as }.merge(options) if run_as results = batch_execute(targets) do |transport, batch| - transport.batch_task(batch, task, arguments, options, &notify) + with_node_logging("Running task #{task.name} with '#{arguments}' via #{task.input_method}", batch) do + transport.batch_task(batch, task, arguments, options, &notify) + end end - @logger.info(summary('task', task_name, results)) + log_action(summary(description, results)) @notifier.shutdown results end def file_upload(targets, source, destination, options = {}, &callback) - @logger.info("Starting file upload from #{source} to #{destination} on #{targets.map(&:uri)}") + description = options.fetch('_description', "file upload from #{source} to #{destination}") + log_action("Starting: #{description} on #{targets.map(&:uri)}") notify = proc { |event| @notifier.notify(callback, event) if callback } options = { '_run_as' => run_as }.merge(options) if run_as results = batch_execute(targets) do |transport, batch| - transport.batch_upload(batch, source, destination, options, &notify) + with_node_logging("Uploading file #{source} to #{destination}", batch) do + transport.batch_upload(batch, source, destination, options, &notify) + end end - @logger.info(summary('upload', source, results)) + log_action(summary(description, results)) @notifier.shutdown results - end - - def puppetdb_client - return @puppetdb_client if @puppetdb_client - puppetdb_config = Bolt::PuppetDB::Config.new(nil, @config.puppetdb) - @puppetdb_client = Bolt::PuppetDB::Client.from_config(puppetdb_config) - end - - def puppetdb_fact(certnames) - puppetdb_client.facts_for_node(certnames) - rescue StandardError => e - raise Bolt::CLIError, "Could not retrieve targets from PuppetDB: #{e}" end end end