lib/bolt/executor.rb in bolt-0.16.1 vs lib/bolt/executor.rb in bolt-0.16.2

- old
+ new

@@ -1,170 +1,156 @@ +# Used for $ERROR_INFO. This *must* be capitalized! +require 'English' require 'json' require 'concurrent' require 'logging' require 'bolt/result' require 'bolt/config' require 'bolt/notifier' -require 'bolt/node' require 'bolt/result_set' +require 'bolt/transport/ssh' +require 'bolt/transport/winrm' +require 'bolt/transport/orch' module Bolt class Executor - attr_reader :noop + attr_reader :noop, :transports attr_accessor :run_as def initialize(config = Bolt::Config.new, noop = nil, plan_logging = false) @config = config @logger = Logging.logger[self] + @transports = { + 'ssh' => Concurrent::Delay.new { Bolt::Transport::SSH.new(config[:transports][:ssh] || {}) }, + 'winrm' => Concurrent::Delay.new { Bolt::Transport::WinRM.new(config[:transports][:winrm] || {}) }, + 'pcp' => Concurrent::Delay.new { Bolt::Transport::Orch.new(config[:transports][:pcp] || {}) } + } + # If a specific elevated log level has been requested, honor that. # Otherwise, escalate the log level to "info" if running in plan mode, so # that certain progress messages will be visible. default_log_level = plan_logging ? :info : :notice @logger.level = @config[:log_level] || default_log_level @noop = noop @run_as = nil + @pool = Concurrent::CachedThreadPool.new(max_threads: @config[:concurrency]) + @logger.debug { "Started with #{@config[:concurrency]} max thread(s)" } @notifier = Bolt::Notifier.new end - def from_targets(targets) - targets.map do |target| - Bolt::Node.from_target(target) - end + def transport(transport) + @transports[transport || 'ssh'].value end - private :from_targets - def on(nodes, callback = nil) - results = Concurrent::Array.new - - poolsize = [nodes.length, @config[:concurrency]].min - pool = Concurrent::FixedThreadPool.new(poolsize) - @logger.debug { "Started with #{poolsize} thread(s)" } - - nodes.map(&:class).uniq.each do |klass| - klass.initialize_transport(@logger) - end - - nodes.each { |node| - pool.post do - result = - begin - @notifier.notify(callback, type: :node_start, target: node.target) if callback - node.connect - yield node - rescue StandardError => ex - Bolt::Result.from_exception(node.target, ex) - ensure - begin - node.disconnect - rescue StandardError => ex - @logger.info("Failed to close connection to #{node.uri} : #{ex.message}") - end - end - results.concat([result]) - @notifier.notify(callback, type: :node_result, result: result) if callback - end - } - pool.shutdown - pool.wait_for_termination - - @notifier.shutdown - - Bolt::ResultSet.new(results) - end - private :on - def summary(action, object, result) fc = result.error_set.length npl = result.length == 1 ? '' : 's' fpl = fc == 1 ? '' : 's' "Ran #{action} '#{object}' on #{result.length} node#{npl} with #{fc} failure#{fpl}" end private :summary - def get_run_as(node, options) - if node.run_as.nil? && run_as - { '_run_as' => run_as }.merge(options) - else - options + # Execute the given block on a list of nodes in parallel, one thread per "batch". + # + # This is the main driver of execution on a list of targets. It first + # groups targets by transport, then divides each group into batches as + # defined by the transport. Each batch, along with the corresponding + # transport, is yielded to the block in turn and the results all collected + # into a single ResultSet. + def batch_execute(targets) + promises = targets.group_by(&:protocol).flat_map do |protocol, _protocol_targets| + transport = transport(protocol) + transport.batches(targets).flat_map do |batch| + batch_promises = Hash[Array(batch).map { |target| [target, Concurrent::Promise.new(executor: :immediate)] }] + # Pass this argument through to avoid retaining a reference to a + # local variable that will change on the next iteration of the loop. + @pool.post(batch_promises) do |result_promises| + begin + results = yield transport, batch + Array(results).each do |result| + result_promises[result.target].set(result) + end + # NotImplementedError can be thrown if the transport is implemented improperly + rescue StandardError, NotImplementedError => e + result_promises.each do |target, promise| + promise.set(Bolt::Result.from_exception(target, e)) + end + ensure + # Make absolutely sure every promise gets a result to avoid a + # deadlock. Use whatever exception is causing this block to + # execute, or generate one if we somehow got here without an + # exception and some promise is still missing a result. + result_promises.each do |target, promise| + next if promise.fulfilled? + error = $ERROR_INFO || Bolt::Error.new("No result was returned for #{target.uri}", + "puppetlabs.bolt/missing-result-error") + promise.set(Bolt::Result.from_exception(error)) + end + end + end + batch_promises.values + end end + ResultSet.new(promises.map(&:value)) end - private :get_run_as - def with_exception_handling(node) - yield - rescue StandardError => e - Bolt::Result.from_exception(node.target, e) - end - private :with_exception_handling + def run_command(targets, command, options = {}, &callback) + @logger.info("Starting command run '#{command}' on #{targets.map(&:uri)}") + notify = proc { |event| @notifier.notify(callback, event) if callback } + options = { '_run_as' => run_as }.merge(options) if run_as - def run_command(targets, command, options = {}) - nodes = from_targets(targets) - @logger.info("Starting command run '#{command}' on #{nodes.map(&:uri)}") - callback = block_given? ? Proc.new : nil - - r = on(nodes, callback) do |node| - @logger.debug("Running command '#{command}' on #{node.uri}") - node_result = with_exception_handling(node) do - node.run_command(command, get_run_as(node, options)) - end - @logger.debug("Result on #{node.uri}: #{JSON.dump(node_result.value)}") - node_result + results = batch_execute(targets) do |transport, batch| + transport.batch_command(batch, command, options, &notify) end - @logger.info(summary('command', command, r)) - r + + @logger.info(summary('command', command, results)) + @notifier.shutdown + results end - def run_script(targets, script, arguments, options = {}) - nodes = from_targets(targets) - @logger.info("Starting script run #{script} on #{nodes.map(&:uri)}") + def run_script(targets, script, arguments, options = {}, &callback) + @logger.info("Starting script run #{script} on #{targets.map(&:uri)}") @logger.debug("Arguments: #{arguments}") - callback = block_given? ? Proc.new : nil + notify = proc { |event| @notifier.notify(callback, event) if callback } + options = { '_run_as' => run_as }.merge(options) if run_as - r = on(nodes, callback) do |node| - @logger.debug { "Running script '#{script}' on #{node.uri}" } - node_result = with_exception_handling(node) do - node.run_script(script, arguments, get_run_as(node, options)) - end - @logger.debug("Result on #{node.uri}: #{JSON.dump(node_result.value)}") - node_result + results = batch_execute(targets) do |transport, batch| + transport.batch_script(batch, script, arguments, options, &notify) end - @logger.info(summary('script', script, r)) - r + + @logger.info(summary('script', script, results)) + @notifier.shutdown + results end - def run_task(targets, task, input_method, arguments, options = {}) - nodes = from_targets(targets) - @logger.info("Starting task #{task} on #{nodes.map(&:uri)}") - @logger.debug("Arguments: #{arguments} Input method: #{input_method}") - callback = block_given? ? Proc.new : nil + def run_task(targets, task, arguments, options = {}, &callback) + task_name = task.name + @logger.info("Starting task #{task_name} on #{targets.map(&:uri)}") + @logger.debug("Arguments: #{arguments} Input method: #{task.input_method}") + notify = proc { |event| @notifier.notify(callback, event) if callback } + options = { '_run_as' => run_as }.merge(options) if run_as - r = on(nodes, callback) do |node| - @logger.debug { "Running task run '#{task}' on #{node.uri}" } - node_result = with_exception_handling(node) do - node.run_task(task, input_method, arguments, get_run_as(node, options)) - end - @logger.debug("Result on #{node.uri}: #{JSON.dump(node_result.value)}") - node_result + results = batch_execute(targets) do |transport, batch| + transport.batch_task(batch, task, arguments, options, &notify) end - @logger.info(summary('task', task, r)) - r + + @logger.info(summary('task', task_name, results)) + @notifier.shutdown + results end - def file_upload(targets, source, destination, options = {}) - nodes = from_targets(targets) - @logger.info("Starting file upload from #{source} to #{destination} on #{nodes.map(&:uri)}") - callback = block_given? ? Proc.new : nil + def file_upload(targets, source, destination, options = {}, &callback) + @logger.info("Starting file upload from #{source} to #{destination} on #{targets.map(&:uri)}") + notify = proc { |event| @notifier.notify(callback, event) if callback } + options = { '_run_as' => run_as }.merge(options) if run_as - r = on(nodes, callback) do |node| - @logger.debug { "Uploading: '#{source}' to #{destination} on #{node.uri}" } - node_result = with_exception_handling(node) do - node.upload(source, destination, options) - end - @logger.debug("Result on #{node.uri}: #{JSON.dump(node_result.value)}") - node_result + results = batch_execute(targets) do |transport, batch| + transport.batch_upload(batch, source, destination, options, &notify) end - @logger.info(summary('upload', source, r)) - r + + @logger.info(summary('upload', source, results)) + @notifier.shutdown + results end end end