lib/bolt/executor.rb in bolt-0.16.1 vs lib/bolt/executor.rb in bolt-0.16.2
- old
+ new
@@ -1,170 +1,156 @@
+# Used for $ERROR_INFO. This *must* be capitalized!
+require 'English'
require 'json'
require 'concurrent'
require 'logging'
require 'bolt/result'
require 'bolt/config'
require 'bolt/notifier'
-require 'bolt/node'
require 'bolt/result_set'
+require 'bolt/transport/ssh'
+require 'bolt/transport/winrm'
+require 'bolt/transport/orch'
module Bolt
class Executor
- attr_reader :noop
+ attr_reader :noop, :transports
attr_accessor :run_as
def initialize(config = Bolt::Config.new, noop = nil, plan_logging = false)
@config = config
@logger = Logging.logger[self]
+ @transports = {
+ 'ssh' => Concurrent::Delay.new { Bolt::Transport::SSH.new(config[:transports][:ssh] || {}) },
+ 'winrm' => Concurrent::Delay.new { Bolt::Transport::WinRM.new(config[:transports][:winrm] || {}) },
+ 'pcp' => Concurrent::Delay.new { Bolt::Transport::Orch.new(config[:transports][:pcp] || {}) }
+ }
+
# If a specific elevated log level has been requested, honor that.
# Otherwise, escalate the log level to "info" if running in plan mode, so
# that certain progress messages will be visible.
default_log_level = plan_logging ? :info : :notice
@logger.level = @config[:log_level] || default_log_level
@noop = noop
@run_as = nil
+ @pool = Concurrent::CachedThreadPool.new(max_threads: @config[:concurrency])
+ @logger.debug { "Started with #{@config[:concurrency]} max thread(s)" }
@notifier = Bolt::Notifier.new
end
- def from_targets(targets)
- targets.map do |target|
- Bolt::Node.from_target(target)
- end
+ def transport(transport)
+ @transports[transport || 'ssh'].value
end
- private :from_targets
- def on(nodes, callback = nil)
- results = Concurrent::Array.new
-
- poolsize = [nodes.length, @config[:concurrency]].min
- pool = Concurrent::FixedThreadPool.new(poolsize)
- @logger.debug { "Started with #{poolsize} thread(s)" }
-
- nodes.map(&:class).uniq.each do |klass|
- klass.initialize_transport(@logger)
- end
-
- nodes.each { |node|
- pool.post do
- result =
- begin
- @notifier.notify(callback, type: :node_start, target: node.target) if callback
- node.connect
- yield node
- rescue StandardError => ex
- Bolt::Result.from_exception(node.target, ex)
- ensure
- begin
- node.disconnect
- rescue StandardError => ex
- @logger.info("Failed to close connection to #{node.uri} : #{ex.message}")
- end
- end
- results.concat([result])
- @notifier.notify(callback, type: :node_result, result: result) if callback
- end
- }
- pool.shutdown
- pool.wait_for_termination
-
- @notifier.shutdown
-
- Bolt::ResultSet.new(results)
- end
- private :on
-
def summary(action, object, result)
fc = result.error_set.length
npl = result.length == 1 ? '' : 's'
fpl = fc == 1 ? '' : 's'
"Ran #{action} '#{object}' on #{result.length} node#{npl} with #{fc} failure#{fpl}"
end
private :summary
- def get_run_as(node, options)
- if node.run_as.nil? && run_as
- { '_run_as' => run_as }.merge(options)
- else
- options
+ # Execute the given block on a list of nodes in parallel, one thread per "batch".
+ #
+ # This is the main driver of execution on a list of targets. It first
+ # groups targets by transport, then divides each group into batches as
+ # defined by the transport. Each batch, along with the corresponding
+ # transport, is yielded to the block in turn and the results all collected
+ # into a single ResultSet.
+ def batch_execute(targets)
+ promises = targets.group_by(&:protocol).flat_map do |protocol, _protocol_targets|
+ transport = transport(protocol)
+ transport.batches(targets).flat_map do |batch|
+ batch_promises = Hash[Array(batch).map { |target| [target, Concurrent::Promise.new(executor: :immediate)] }]
+ # Pass this argument through to avoid retaining a reference to a
+ # local variable that will change on the next iteration of the loop.
+ @pool.post(batch_promises) do |result_promises|
+ begin
+ results = yield transport, batch
+ Array(results).each do |result|
+ result_promises[result.target].set(result)
+ end
+ # NotImplementedError can be thrown if the transport is implemented improperly
+ rescue StandardError, NotImplementedError => e
+ result_promises.each do |target, promise|
+ promise.set(Bolt::Result.from_exception(target, e))
+ end
+ ensure
+ # Make absolutely sure every promise gets a result to avoid a
+ # deadlock. Use whatever exception is causing this block to
+ # execute, or generate one if we somehow got here without an
+ # exception and some promise is still missing a result.
+ result_promises.each do |target, promise|
+ next if promise.fulfilled?
+ error = $ERROR_INFO || Bolt::Error.new("No result was returned for #{target.uri}",
+ "puppetlabs.bolt/missing-result-error")
+ promise.set(Bolt::Result.from_exception(error))
+ end
+ end
+ end
+ batch_promises.values
+ end
end
+ ResultSet.new(promises.map(&:value))
end
- private :get_run_as
- def with_exception_handling(node)
- yield
- rescue StandardError => e
- Bolt::Result.from_exception(node.target, e)
- end
- private :with_exception_handling
+ def run_command(targets, command, options = {}, &callback)
+ @logger.info("Starting command run '#{command}' on #{targets.map(&:uri)}")
+ notify = proc { |event| @notifier.notify(callback, event) if callback }
+ options = { '_run_as' => run_as }.merge(options) if run_as
- def run_command(targets, command, options = {})
- nodes = from_targets(targets)
- @logger.info("Starting command run '#{command}' on #{nodes.map(&:uri)}")
- callback = block_given? ? Proc.new : nil
-
- r = on(nodes, callback) do |node|
- @logger.debug("Running command '#{command}' on #{node.uri}")
- node_result = with_exception_handling(node) do
- node.run_command(command, get_run_as(node, options))
- end
- @logger.debug("Result on #{node.uri}: #{JSON.dump(node_result.value)}")
- node_result
+ results = batch_execute(targets) do |transport, batch|
+ transport.batch_command(batch, command, options, ¬ify)
end
- @logger.info(summary('command', command, r))
- r
+
+ @logger.info(summary('command', command, results))
+ @notifier.shutdown
+ results
end
- def run_script(targets, script, arguments, options = {})
- nodes = from_targets(targets)
- @logger.info("Starting script run #{script} on #{nodes.map(&:uri)}")
+ def run_script(targets, script, arguments, options = {}, &callback)
+ @logger.info("Starting script run #{script} on #{targets.map(&:uri)}")
@logger.debug("Arguments: #{arguments}")
- callback = block_given? ? Proc.new : nil
+ notify = proc { |event| @notifier.notify(callback, event) if callback }
+ options = { '_run_as' => run_as }.merge(options) if run_as
- r = on(nodes, callback) do |node|
- @logger.debug { "Running script '#{script}' on #{node.uri}" }
- node_result = with_exception_handling(node) do
- node.run_script(script, arguments, get_run_as(node, options))
- end
- @logger.debug("Result on #{node.uri}: #{JSON.dump(node_result.value)}")
- node_result
+ results = batch_execute(targets) do |transport, batch|
+ transport.batch_script(batch, script, arguments, options, ¬ify)
end
- @logger.info(summary('script', script, r))
- r
+
+ @logger.info(summary('script', script, results))
+ @notifier.shutdown
+ results
end
- def run_task(targets, task, input_method, arguments, options = {})
- nodes = from_targets(targets)
- @logger.info("Starting task #{task} on #{nodes.map(&:uri)}")
- @logger.debug("Arguments: #{arguments} Input method: #{input_method}")
- callback = block_given? ? Proc.new : nil
+ def run_task(targets, task, arguments, options = {}, &callback)
+ task_name = task.name
+ @logger.info("Starting task #{task_name} on #{targets.map(&:uri)}")
+ @logger.debug("Arguments: #{arguments} Input method: #{task.input_method}")
+ notify = proc { |event| @notifier.notify(callback, event) if callback }
+ options = { '_run_as' => run_as }.merge(options) if run_as
- r = on(nodes, callback) do |node|
- @logger.debug { "Running task run '#{task}' on #{node.uri}" }
- node_result = with_exception_handling(node) do
- node.run_task(task, input_method, arguments, get_run_as(node, options))
- end
- @logger.debug("Result on #{node.uri}: #{JSON.dump(node_result.value)}")
- node_result
+ results = batch_execute(targets) do |transport, batch|
+ transport.batch_task(batch, task, arguments, options, ¬ify)
end
- @logger.info(summary('task', task, r))
- r
+
+ @logger.info(summary('task', task_name, results))
+ @notifier.shutdown
+ results
end
- def file_upload(targets, source, destination, options = {})
- nodes = from_targets(targets)
- @logger.info("Starting file upload from #{source} to #{destination} on #{nodes.map(&:uri)}")
- callback = block_given? ? Proc.new : nil
+ def file_upload(targets, source, destination, options = {}, &callback)
+ @logger.info("Starting file upload from #{source} to #{destination} on #{targets.map(&:uri)}")
+ notify = proc { |event| @notifier.notify(callback, event) if callback }
+ options = { '_run_as' => run_as }.merge(options) if run_as
- r = on(nodes, callback) do |node|
- @logger.debug { "Uploading: '#{source}' to #{destination} on #{node.uri}" }
- node_result = with_exception_handling(node) do
- node.upload(source, destination, options)
- end
- @logger.debug("Result on #{node.uri}: #{JSON.dump(node_result.value)}")
- node_result
+ results = batch_execute(targets) do |transport, batch|
+ transport.batch_upload(batch, source, destination, options, ¬ify)
end
- @logger.info(summary('upload', source, r))
- r
+
+ @logger.info(summary('upload', source, results))
+ @notifier.shutdown
+ results
end
end
end