lib/bolt/executor.rb in bolt-1.19.0 vs lib/bolt/executor.rb in bolt-1.20.0
- old
+ new
@@ -6,11 +6,10 @@
require 'logging'
require 'set'
require 'bolt/analytics'
require 'bolt/result'
require 'bolt/config'
-require 'bolt/notifier'
require 'bolt/result_set'
require 'bolt/puppetdb'
module Bolt
class Executor
@@ -39,30 +38,49 @@
val.new
end
end
end
@reported_transports = Set.new
+ @subscribers = Set.new
+ @publisher = Concurrent::SingleThreadExecutor.new
@noop = noop
@run_as = nil
@pool = if concurrency > 0
Concurrent::ThreadPoolExecutor.new(max_threads: concurrency)
else
Concurrent.global_immediate_executor
end
@logger.debug { "Started with #{concurrency} max thread(s)" }
- @notifier = Bolt::Notifier.new
end
def transport(transport)
impl = @transports[transport || 'ssh']
raise(Bolt::UnknownTransportError, transport) unless impl
# If there was an error creating the transport, ensure it gets thrown
impl.no_error!
impl.value
end
+ def subscribe(subscriber)
+ @subscribers << subscriber
+ self
+ end
+
+ def publish_event(event)
+ @subscribers.each do |subscriber|
+ @publisher.post(subscriber) do |sub|
+ sub.handle_event(event)
+ end
+ end
+ end
+
+ def shutdown
+ @publisher.shutdown
+ @publisher.wait_for_termination
+ end
+
# Starts executing the given block on a list of nodes in parallel, one thread per "batch".
#
# This is the main driver of execution on a list of targets. It first
# groups targets by transport, then divides each group into batches as
# defined by the transport. Yields each batch, along with the corresponding
@@ -127,43 +145,31 @@
promises = queue_execute(targets, &block)
await_results(promises)
end
def log_action(description, targets)
- # When running a plan, info messages like starting a task are promoted to notice.
- log_method = @plan_logging ? :notice : :info
- target_str = if targets.length > 5
- "#{targets.count} targets"
- else
- targets.map(&:uri).join(', ')
- end
+ publish_event(type: :step_start, description: description, targets: targets)
- @logger.send(log_method, "Starting: #{description} on #{target_str}")
-
start_time = Time.now
results = yield
duration = Time.now - start_time
- failures = results.error_set.length
- plural = failures == 1 ? '' : 's'
+ publish_event(type: :step_finish, description: description, result: results, duration: duration)
- @logger.send(log_method, "Finished: #{description} with #{failures} failure#{plural} in #{duration.round(2)} sec")
-
results
end
def log_plan(plan_name)
- log_method = @plan_logging ? :notice : :info
- @logger.send(log_method, "Starting: plan #{plan_name}")
+ publish_event(type: :plan_start, plan: plan_name)
start_time = Time.now
results = nil
begin
results = yield
ensure
duration = Time.now - start_time
- @logger.send(log_method, "Finished: plan #{plan_name} in #{duration.round(2)} sec")
+ publish_event(type: :plan_finish, plan: plan_name, duration: duration)
end
results
end
@@ -218,76 +224,60 @@
result = yield
@logger.info(result.to_json)
result
end
- def run_command(targets, command, options = {}, &callback)
+ def run_command(targets, command, options = {})
description = options.fetch('_description', "command '#{command}'")
log_action(description, targets) do
- notify = proc { |event| @notifier.notify(callback, event) if callback }
options = { '_run_as' => run_as }.merge(options) if run_as
- results = batch_execute(targets) do |transport, batch|
+ batch_execute(targets) do |transport, batch|
with_node_logging("Running command '#{command}'", batch) do
- transport.batch_command(batch, command, options, ¬ify)
+ transport.batch_command(batch, command, options, &method(:publish_event))
end
end
-
- @notifier.shutdown
- results
end
end
- def run_script(targets, script, arguments, options = {}, &callback)
+ def run_script(targets, script, arguments, options = {})
description = options.fetch('_description', "script #{script}")
log_action(description, targets) do
- notify = proc { |event| @notifier.notify(callback, event) if callback }
options = { '_run_as' => run_as }.merge(options) if run_as
- results = batch_execute(targets) do |transport, batch|
+ batch_execute(targets) do |transport, batch|
with_node_logging("Running script #{script} with '#{arguments}'", batch) do
- transport.batch_script(batch, script, arguments, options, ¬ify)
+ transport.batch_script(batch, script, arguments, options, &method(:publish_event))
end
end
-
- @notifier.shutdown
- results
end
end
- def run_task(targets, task, arguments, options = {}, &callback)
+ def run_task(targets, task, arguments, options = {})
description = options.fetch('_description', "task #{task.name}")
log_action(description, targets) do
- notify = proc { |event| @notifier.notify(callback, event) if callback }
options = { '_run_as' => run_as }.merge(options) if run_as
arguments['_task'] = task.name
- results = batch_execute(targets) do |transport, batch|
+ batch_execute(targets) do |transport, batch|
with_node_logging("Running task #{task.name} with '#{arguments}'", batch) do
- transport.batch_task(batch, task, arguments, options, ¬ify)
+ transport.batch_task(batch, task, arguments, options, &method(:publish_event))
end
end
-
- @notifier.shutdown
- results
end
end
- def upload_file(targets, source, destination, options = {}, &callback)
+ def upload_file(targets, source, destination, options = {})
description = options.fetch('_description', "file upload from #{source} to #{destination}")
log_action(description, targets) do
- notify = proc { |event| @notifier.notify(callback, event) if callback }
options = { '_run_as' => run_as }.merge(options) if run_as
- results = batch_execute(targets) do |transport, batch|
+ batch_execute(targets) do |transport, batch|
with_node_logging("Uploading file #{source} to #{destination}", batch) do
- transport.batch_upload(batch, source, destination, options, ¬ify)
+ transport.batch_upload(batch, source, destination, options, &method(:publish_event))
end
end
-
- @notifier.shutdown
- results
end
end
class TimeoutError < RuntimeError; end
@@ -332,21 +322,19 @@
#
# In the future if other transports need this or if we want a plan stack
# we'll need to refactor.
def start_plan(plan_context)
transport('pcp').plan_context = plan_context
- @plan_logging = true
end
def finish_plan(plan_result)
transport('pcp').finish_plan(plan_result)
end
def without_default_logging
- old_log = @plan_logging
- @plan_logging = false
+ publish_event(type: :disable_default_output)
yield
ensure
- @plan_logging = old_log
+ publish_event(type: :enable_default_output)
end
end
end