lib/bolt/executor.rb in bolt-0.19.0 vs lib/bolt/executor.rb in bolt-0.19.1
- old
+ new
@@ -14,14 +14,14 @@
module Bolt
class Executor
attr_reader :noop, :transports
attr_accessor :run_as
- def initialize(config = Bolt::Config.new, noop = nil, plan_logging = false)
+ def initialize(config = Bolt::Config.new, noop = nil)
@config = config
@logger = Logging.logger[self]
- @plan_logging = plan_logging
+ @plan_logging = false
@transports = Bolt::TRANSPORTS.each_with_object({}) do |(key, val), coll|
coll[key.to_s] = Concurrent::Delay.new { val.new }
end
@@ -32,23 +32,16 @@
@notifier = Bolt::Notifier.new
end
def transport(transport)
impl = @transports[transport || 'ssh']
+ raise(Bolt::UnknownTransportError, transport) unless impl
# If there was an error creating the transport, ensure it gets thrown
impl.no_error!
impl.value
end
- def summary(description, result)
- fc = result.error_set.length
- npl = result.length == 1 ? '' : 's'
- fpl = fc == 1 ? '' : 's'
- "Finished: #{description} on #{result.length} node#{npl} with #{fc} failure#{fpl}"
- end
- private :summary
-
# Execute the given block on a list of nodes in parallel, one thread per "batch".
#
# This is the main driver of execution on a list of targets. It first
# groups targets by transport, then divides each group into batches as
# defined by the transport. Each batch, along with the corresponding
@@ -91,13 +84,31 @@
end
end
ResultSet.new(promises.map(&:value))
end
- # When running a plan, info messages like starting a task are promoted to notice.
- def log_action(msg)
- @plan_logging ? @logger.notice(msg) : @logger.info(msg)
+ def log_action(description, targets)
+ # When running a plan, info messages like starting a task are promoted to notice.
+ log_method = @plan_logging ? :notice : :info
+ target_str = if targets.length > 5
+ "#{targets.count} targets"
+ else
+ targets.map(&:uri).join(', ')
+ end
+
+ @logger.send(log_method, "Starting: #{description} on #{target_str}")
+
+ start_time = Time.now
+ results = yield
+ duration = Time.now - start_time
+
+ failures = results.error_set.length
+ plural = failures == 1 ? '' : 's'
+
+ @logger.send(log_method, "Finished: #{description} with #{failures} failure#{plural} in #{duration.round(2)} sec")
+
+ results
end
private :log_action
def with_node_logging(description, batch)
@logger.info("#{description} on #{batch.map(&:uri)}")
@@ -107,74 +118,91 @@
end
private :with_node_logging
def run_command(targets, command, options = {}, &callback)
description = options.fetch('_description', "command '#{command}'")
- log_action("Starting: #{description} on #{targets.map(&:uri)}")
- notify = proc { |event| @notifier.notify(callback, event) if callback }
- options = { '_run_as' => run_as }.merge(options) if run_as
+ log_action(description, targets) do
+ notify = proc { |event| @notifier.notify(callback, event) if callback }
+ options = { '_run_as' => run_as }.merge(options) if run_as
- results = batch_execute(targets) do |transport, batch|
- with_node_logging("Running command '#{command}'", batch) do
- transport.batch_command(batch, command, options, ¬ify)
+ results = batch_execute(targets) do |transport, batch|
+ with_node_logging("Running command '#{command}'", batch) do
+ transport.batch_command(batch, command, options, ¬ify)
+ end
end
- end
- log_action(summary(description, results))
- @notifier.shutdown
- results
+ @notifier.shutdown
+ results
+ end
end
def run_script(targets, script, arguments, options = {}, &callback)
description = options.fetch('_description', "script #{script}")
- log_action("Starting: #{description} on #{targets.map(&:uri)}")
+ log_action(description, targets) do
+ notify = proc { |event| @notifier.notify(callback, event) if callback }
+ options = { '_run_as' => run_as }.merge(options) if run_as
- notify = proc { |event| @notifier.notify(callback, event) if callback }
- options = { '_run_as' => run_as }.merge(options) if run_as
-
- results = batch_execute(targets) do |transport, batch|
- with_node_logging("Running script #{script} with '#{arguments}'", batch) do
- transport.batch_script(batch, script, arguments, options, ¬ify)
+ results = batch_execute(targets) do |transport, batch|
+ with_node_logging("Running script #{script} with '#{arguments}'", batch) do
+ transport.batch_script(batch, script, arguments, options, ¬ify)
+ end
end
- end
- log_action(summary(description, results))
- @notifier.shutdown
- results
+ @notifier.shutdown
+ results
+ end
end
def run_task(targets, task, arguments, options = {}, &callback)
description = options.fetch('_description', "task #{task.name}")
- log_action("Starting: #{description} on #{targets.map(&:uri)}")
+ log_action(description, targets) do
+ notify = proc { |event| @notifier.notify(callback, event) if callback }
+ options = { '_run_as' => run_as }.merge(options) if run_as
- notify = proc { |event| @notifier.notify(callback, event) if callback }
- options = { '_run_as' => run_as }.merge(options) if run_as
-
- results = batch_execute(targets) do |transport, batch|
- with_node_logging("Running task #{task.name} with '#{arguments}' via #{task.input_method}", batch) do
- transport.batch_task(batch, task, arguments, options, ¬ify)
+ results = batch_execute(targets) do |transport, batch|
+ with_node_logging("Running task #{task.name} with '#{arguments}' via #{task.input_method}", batch) do
+ transport.batch_task(batch, task, arguments, options, ¬ify)
+ end
end
- end
- log_action(summary(description, results))
- @notifier.shutdown
- results
+ @notifier.shutdown
+ results
+ end
end
def file_upload(targets, source, destination, options = {}, &callback)
description = options.fetch('_description', "file upload from #{source} to #{destination}")
- log_action("Starting: #{description} on #{targets.map(&:uri)}")
- notify = proc { |event| @notifier.notify(callback, event) if callback }
- options = { '_run_as' => run_as }.merge(options) if run_as
+ log_action(description, targets) do
+ notify = proc { |event| @notifier.notify(callback, event) if callback }
+ options = { '_run_as' => run_as }.merge(options) if run_as
- results = batch_execute(targets) do |transport, batch|
- with_node_logging("Uploading file #{source} to #{destination}", batch) do
- transport.batch_upload(batch, source, destination, options, ¬ify)
+ results = batch_execute(targets) do |transport, batch|
+ with_node_logging("Uploading file #{source} to #{destination}", batch) do
+ transport.batch_upload(batch, source, destination, options, ¬ify)
+ end
end
+
+ @notifier.shutdown
+ results
end
+ end
- log_action(summary(description, results))
- @notifier.shutdown
- results
+ # Plan context doesn't make sense for most transports but it is tightly
+ # coupled with the orchestrator transport since the transport behaves
+ # differently when a plan is running. In order to limit how much this
+ # pollutes the transport API we only handle the orchestrator transport here.
+ # Since we callt this function without resolving targets this will result
+ # in the orchestrator transport always being initialized during plan runs.
+ # For now that's ok.
+ #
+ # In the future if other transports need this or if we want a plan stack
+ # we'll need to refactor.
+ def start_plan(plan_context)
+ transport('pcp').plan_context = plan_context
+ @plan_logging = true
+ end
+
+ def finish_plan(plan_result)
+ transport('pcp').finish_plan(plan_result)
end
end
end