lib/bolt/executor.rb in bolt-0.19.0 vs lib/bolt/executor.rb in bolt-0.19.1

- old
+ new

@@ -14,14 +14,14 @@ module Bolt class Executor attr_reader :noop, :transports attr_accessor :run_as - def initialize(config = Bolt::Config.new, noop = nil, plan_logging = false) + def initialize(config = Bolt::Config.new, noop = nil) @config = config @logger = Logging.logger[self] - @plan_logging = plan_logging + @plan_logging = false @transports = Bolt::TRANSPORTS.each_with_object({}) do |(key, val), coll| coll[key.to_s] = Concurrent::Delay.new { val.new } end @@ -32,23 +32,16 @@ @notifier = Bolt::Notifier.new end def transport(transport) impl = @transports[transport || 'ssh'] + raise(Bolt::UnknownTransportError, transport) unless impl # If there was an error creating the transport, ensure it gets thrown impl.no_error! impl.value end - def summary(description, result) - fc = result.error_set.length - npl = result.length == 1 ? '' : 's' - fpl = fc == 1 ? '' : 's' - "Finished: #{description} on #{result.length} node#{npl} with #{fc} failure#{fpl}" - end - private :summary - # Execute the given block on a list of nodes in parallel, one thread per "batch". # # This is the main driver of execution on a list of targets. It first # groups targets by transport, then divides each group into batches as # defined by the transport. Each batch, along with the corresponding @@ -91,13 +84,31 @@ end end ResultSet.new(promises.map(&:value)) end - # When running a plan, info messages like starting a task are promoted to notice. - def log_action(msg) - @plan_logging ? @logger.notice(msg) : @logger.info(msg) + def log_action(description, targets) + # When running a plan, info messages like starting a task are promoted to notice. + log_method = @plan_logging ? :notice : :info + target_str = if targets.length > 5 + "#{targets.count} targets" + else + targets.map(&:uri).join(', ') + end + + @logger.send(log_method, "Starting: #{description} on #{target_str}") + + start_time = Time.now + results = yield + duration = Time.now - start_time + + failures = results.error_set.length + plural = failures == 1 ? '' : 's' + + @logger.send(log_method, "Finished: #{description} with #{failures} failure#{plural} in #{duration.round(2)} sec") + + results end private :log_action def with_node_logging(description, batch) @logger.info("#{description} on #{batch.map(&:uri)}") @@ -107,74 +118,91 @@ end private :with_node_logging def run_command(targets, command, options = {}, &callback) description = options.fetch('_description', "command '#{command}'") - log_action("Starting: #{description} on #{targets.map(&:uri)}") - notify = proc { |event| @notifier.notify(callback, event) if callback } - options = { '_run_as' => run_as }.merge(options) if run_as + log_action(description, targets) do + notify = proc { |event| @notifier.notify(callback, event) if callback } + options = { '_run_as' => run_as }.merge(options) if run_as - results = batch_execute(targets) do |transport, batch| - with_node_logging("Running command '#{command}'", batch) do - transport.batch_command(batch, command, options, &notify) + results = batch_execute(targets) do |transport, batch| + with_node_logging("Running command '#{command}'", batch) do + transport.batch_command(batch, command, options, &notify) + end end - end - log_action(summary(description, results)) - @notifier.shutdown - results + @notifier.shutdown + results + end end def run_script(targets, script, arguments, options = {}, &callback) description = options.fetch('_description', "script #{script}") - log_action("Starting: #{description} on #{targets.map(&:uri)}") + log_action(description, targets) do + notify = proc { |event| @notifier.notify(callback, event) if callback } + options = { '_run_as' => run_as }.merge(options) if run_as - notify = proc { |event| @notifier.notify(callback, event) if callback } - options = { '_run_as' => run_as }.merge(options) if run_as - - results = batch_execute(targets) do |transport, batch| - with_node_logging("Running script #{script} with '#{arguments}'", batch) do - transport.batch_script(batch, script, arguments, options, &notify) + results = batch_execute(targets) do |transport, batch| + with_node_logging("Running script #{script} with '#{arguments}'", batch) do + transport.batch_script(batch, script, arguments, options, &notify) + end end - end - log_action(summary(description, results)) - @notifier.shutdown - results + @notifier.shutdown + results + end end def run_task(targets, task, arguments, options = {}, &callback) description = options.fetch('_description', "task #{task.name}") - log_action("Starting: #{description} on #{targets.map(&:uri)}") + log_action(description, targets) do + notify = proc { |event| @notifier.notify(callback, event) if callback } + options = { '_run_as' => run_as }.merge(options) if run_as - notify = proc { |event| @notifier.notify(callback, event) if callback } - options = { '_run_as' => run_as }.merge(options) if run_as - - results = batch_execute(targets) do |transport, batch| - with_node_logging("Running task #{task.name} with '#{arguments}' via #{task.input_method}", batch) do - transport.batch_task(batch, task, arguments, options, &notify) + results = batch_execute(targets) do |transport, batch| + with_node_logging("Running task #{task.name} with '#{arguments}' via #{task.input_method}", batch) do + transport.batch_task(batch, task, arguments, options, &notify) + end end - end - log_action(summary(description, results)) - @notifier.shutdown - results + @notifier.shutdown + results + end end def file_upload(targets, source, destination, options = {}, &callback) description = options.fetch('_description', "file upload from #{source} to #{destination}") - log_action("Starting: #{description} on #{targets.map(&:uri)}") - notify = proc { |event| @notifier.notify(callback, event) if callback } - options = { '_run_as' => run_as }.merge(options) if run_as + log_action(description, targets) do + notify = proc { |event| @notifier.notify(callback, event) if callback } + options = { '_run_as' => run_as }.merge(options) if run_as - results = batch_execute(targets) do |transport, batch| - with_node_logging("Uploading file #{source} to #{destination}", batch) do - transport.batch_upload(batch, source, destination, options, &notify) + results = batch_execute(targets) do |transport, batch| + with_node_logging("Uploading file #{source} to #{destination}", batch) do + transport.batch_upload(batch, source, destination, options, &notify) + end end + + @notifier.shutdown + results end + end - log_action(summary(description, results)) - @notifier.shutdown - results + # Plan context doesn't make sense for most transports but it is tightly + # coupled with the orchestrator transport since the transport behaves + # differently when a plan is running. In order to limit how much this + # pollutes the transport API we only handle the orchestrator transport here. + # Since we callt this function without resolving targets this will result + # in the orchestrator transport always being initialized during plan runs. + # For now that's ok. + # + # In the future if other transports need this or if we want a plan stack + # we'll need to refactor. + def start_plan(plan_context) + transport('pcp').plan_context = plan_context + @plan_logging = true + end + + def finish_plan(plan_result) + transport('pcp').finish_plan(plan_result) end end end