lib/bolt/executor.rb in bolt-1.19.0 vs lib/bolt/executor.rb in bolt-1.20.0

- old
+ new

@@ -6,11 +6,10 @@ require 'logging' require 'set' require 'bolt/analytics' require 'bolt/result' require 'bolt/config' -require 'bolt/notifier' require 'bolt/result_set' require 'bolt/puppetdb' module Bolt class Executor @@ -39,30 +38,49 @@ val.new end end end @reported_transports = Set.new + @subscribers = Set.new + @publisher = Concurrent::SingleThreadExecutor.new @noop = noop @run_as = nil @pool = if concurrency > 0 Concurrent::ThreadPoolExecutor.new(max_threads: concurrency) else Concurrent.global_immediate_executor end @logger.debug { "Started with #{concurrency} max thread(s)" } - @notifier = Bolt::Notifier.new end def transport(transport) impl = @transports[transport || 'ssh'] raise(Bolt::UnknownTransportError, transport) unless impl # If there was an error creating the transport, ensure it gets thrown impl.no_error! impl.value end + def subscribe(subscriber) + @subscribers << subscriber + self + end + + def publish_event(event) + @subscribers.each do |subscriber| + @publisher.post(subscriber) do |sub| + sub.handle_event(event) + end + end + end + + def shutdown + @publisher.shutdown + @publisher.wait_for_termination + end + # Starts executing the given block on a list of nodes in parallel, one thread per "batch". # # This is the main driver of execution on a list of targets. It first # groups targets by transport, then divides each group into batches as # defined by the transport. Yields each batch, along with the corresponding @@ -127,43 +145,31 @@ promises = queue_execute(targets, &block) await_results(promises) end def log_action(description, targets) - # When running a plan, info messages like starting a task are promoted to notice. - log_method = @plan_logging ? :notice : :info - target_str = if targets.length > 5 - "#{targets.count} targets" - else - targets.map(&:uri).join(', ') - end + publish_event(type: :step_start, description: description, targets: targets) - @logger.send(log_method, "Starting: #{description} on #{target_str}") - start_time = Time.now results = yield duration = Time.now - start_time - failures = results.error_set.length - plural = failures == 1 ? '' : 's' + publish_event(type: :step_finish, description: description, result: results, duration: duration) - @logger.send(log_method, "Finished: #{description} with #{failures} failure#{plural} in #{duration.round(2)} sec") - results end def log_plan(plan_name) - log_method = @plan_logging ? :notice : :info - @logger.send(log_method, "Starting: plan #{plan_name}") + publish_event(type: :plan_start, plan: plan_name) start_time = Time.now results = nil begin results = yield ensure duration = Time.now - start_time - @logger.send(log_method, "Finished: plan #{plan_name} in #{duration.round(2)} sec") + publish_event(type: :plan_finish, plan: plan_name, duration: duration) end results end @@ -218,76 +224,60 @@ result = yield @logger.info(result.to_json) result end - def run_command(targets, command, options = {}, &callback) + def run_command(targets, command, options = {}) description = options.fetch('_description', "command '#{command}'") log_action(description, targets) do - notify = proc { |event| @notifier.notify(callback, event) if callback } options = { '_run_as' => run_as }.merge(options) if run_as - results = batch_execute(targets) do |transport, batch| + batch_execute(targets) do |transport, batch| with_node_logging("Running command '#{command}'", batch) do - transport.batch_command(batch, command, options, &notify) + transport.batch_command(batch, command, options, &method(:publish_event)) end end - - @notifier.shutdown - results end end - def run_script(targets, script, arguments, options = {}, &callback) + def run_script(targets, script, arguments, options = {}) description = options.fetch('_description', "script #{script}") log_action(description, targets) do - notify = proc { |event| @notifier.notify(callback, event) if callback } options = { '_run_as' => run_as }.merge(options) if run_as - results = batch_execute(targets) do |transport, batch| + batch_execute(targets) do |transport, batch| with_node_logging("Running script #{script} with '#{arguments}'", batch) do - transport.batch_script(batch, script, arguments, options, &notify) + transport.batch_script(batch, script, arguments, options, &method(:publish_event)) end end - - @notifier.shutdown - results end end - def run_task(targets, task, arguments, options = {}, &callback) + def run_task(targets, task, arguments, options = {}) description = options.fetch('_description', "task #{task.name}") log_action(description, targets) do - notify = proc { |event| @notifier.notify(callback, event) if callback } options = { '_run_as' => run_as }.merge(options) if run_as arguments['_task'] = task.name - results = batch_execute(targets) do |transport, batch| + batch_execute(targets) do |transport, batch| with_node_logging("Running task #{task.name} with '#{arguments}'", batch) do - transport.batch_task(batch, task, arguments, options, &notify) + transport.batch_task(batch, task, arguments, options, &method(:publish_event)) end end - - @notifier.shutdown - results end end - def upload_file(targets, source, destination, options = {}, &callback) + def upload_file(targets, source, destination, options = {}) description = options.fetch('_description', "file upload from #{source} to #{destination}") log_action(description, targets) do - notify = proc { |event| @notifier.notify(callback, event) if callback } options = { '_run_as' => run_as }.merge(options) if run_as - results = batch_execute(targets) do |transport, batch| + batch_execute(targets) do |transport, batch| with_node_logging("Uploading file #{source} to #{destination}", batch) do - transport.batch_upload(batch, source, destination, options, &notify) + transport.batch_upload(batch, source, destination, options, &method(:publish_event)) end end - - @notifier.shutdown - results end end class TimeoutError < RuntimeError; end @@ -332,21 +322,19 @@ # # In the future if other transports need this or if we want a plan stack # we'll need to refactor. def start_plan(plan_context) transport('pcp').plan_context = plan_context - @plan_logging = true end def finish_plan(plan_result) transport('pcp').finish_plan(plan_result) end def without_default_logging - old_log = @plan_logging - @plan_logging = false + publish_event(type: :disable_default_output) yield ensure - @plan_logging = old_log + publish_event(type: :enable_default_output) end end end