lib/bolt/executor.rb in bolt-3.8.1 vs lib/bolt/executor.rb in bolt-3.9.0
- old
+ new
@@ -5,24 +5,24 @@
require 'json'
require 'logging'
require 'pathname'
require 'set'
require 'bolt/analytics'
-require 'bolt/result'
require 'bolt/config'
-require 'bolt/result_set'
+require 'bolt/fiber_executor'
require 'bolt/puppetdb'
+require 'bolt/result'
+require 'bolt/result_set'
# Load transports
require 'bolt/transport/docker'
require 'bolt/transport/local'
require 'bolt/transport/lxd'
require 'bolt/transport/orch'
require 'bolt/transport/podman'
require 'bolt/transport/remote'
require 'bolt/transport/ssh'
require 'bolt/transport/winrm'
-require 'bolt/yarn'
module Bolt
TRANSPORTS = {
docker: Bolt::Transport::Docker,
local: Bolt::Transport::Local,
@@ -33,11 +33,11 @@
ssh: Bolt::Transport::SSH,
winrm: Bolt::Transport::WinRM
}.freeze
class Executor
- attr_reader :noop, :transports, :in_parallel, :future
+ attr_reader :noop, :transports, :future
attr_accessor :run_as
def initialize(concurrency = 1,
analytics = Bolt::Analytics::NoopClient.new,
noop = false,
@@ -64,21 +64,21 @@
@publisher = Concurrent::SingleThreadExecutor.new
@publisher.post { Thread.current[:name] = 'event-publisher' }
@noop = noop
@run_as = nil
- @in_parallel = false
@future = future
@pool = if concurrency > 0
Concurrent::ThreadPoolExecutor.new(name: 'exec', max_threads: concurrency)
else
Concurrent.global_immediate_executor
end
@logger.debug { "Started with #{concurrency} max thread(s)" }
@concurrency = concurrency
@warn_concurrency = modified_concurrency
+ @fiber_executor = Bolt::FiberExecutor.new
end
def transport(transport)
impl = @transports[transport || 'ssh']
raise(Bolt::UnknownTransportError, transport) unless impl
@@ -371,87 +371,66 @@
def run_plan(scope, plan, params)
plan.call_by_name_with_scope(scope, params, true)
end
- def create_yarn(scope, block, object, index)
- fiber = Fiber.new do
- # Create the new scope
- newscope = Puppet::Parser::Scope.new(scope.compiler)
- local = Puppet::Parser::Scope::LocalScope.new
+ # Call into FiberExecutor to avoid this class getting
+ # overloaded while also minimizing the Puppet lookups needed from plan
+ # functions
+ #
+ def create_future(scope: nil, name: nil, &block)
+ @fiber_executor.create_future(scope: scope, name: name, &block)
+ end
- # Compress the current scopes into a single vars hash to add to the new scope
- current_scope = scope.effective_symtable(true)
- until current_scope.nil?
- current_scope.instance_variable_get(:@symbols)&.each_pair { |k, v| local[k] = v }
- current_scope = current_scope.parent
- end
- newscope.push_ephemerals([local])
+ def plan_complete?
+ @fiber_executor.plan_complete?
+ end
- begin
- result = catch(:return) do
- args = { block.parameters[0][1].to_s => object }
- block.closure.call_by_name_with_scope(newscope, args, true)
- end
+ def round_robin
+ @fiber_executor.round_robin
+ end
- # If we got a return from the block, get it's value
- # Otherwise the result is the last line from the block
- result = result.value if result.is_a?(Puppet::Pops::Evaluator::Return)
+ def in_parallel?
+ @fiber_executor.in_parallel?
+ end
- # Validate the result is a PlanResult
- unless Puppet::Pops::Types::TypeParser.singleton.parse('Boltlib::PlanResult').instance?(result)
- raise Bolt::InvalidParallelResult.new(result.to_s, *Puppet::Pops::PuppetStack.top_of_stack)
- end
+ def wait(futures, **opts)
+ @fiber_executor.wait(futures, **opts)
+ end
- result
- rescue Puppet::PreformattedError => e
- if e.cause.is_a?(Bolt::Error)
- e.cause
- else
- raise e
- end
- end
- end
-
- Bolt::Yarn.new(fiber, index)
+ def plan_futures
+ @fiber_executor.plan_futures
end
- def handle_event(event)
- case event[:type]
- when :node_result
- @thread_completed = true
+ # Execute a plan function concurrently. This function accepts the executor
+ # function to be run and the parameters to pass to it, and returns the
+ # result of running the executor function.
+ #
+ def run_in_thread
+ require 'concurrent'
+ require 'fiber'
+ future = Concurrent::Future.execute do
+ yield
end
- end
- def round_robin(skein)
- subscribe(self, [:node_result])
- results = Array.new(skein.length)
- @in_parallel = true
- publish_event(type: :stop_spin)
-
- until skein.empty?
- @thread_completed = false
- r = nil
-
- skein.each do |yarn|
- if yarn.alive?
- publish_event(type: :stop_spin)
- r = yarn.resume
- else
- results[yarn.index] = yarn.value
- skein.delete(yarn)
- end
- end
-
- next unless r == 'unfinished'
- sleep(0.1) until @thread_completed || skein.empty?
+ # Used to track how often we resume the same executor function
+ still_running = 0
+ # While the thread is still running
+ while future.incomplete?
+ # If the Fiber gets resumed, increment the resume tracker. This means
+ # the tracker starts at 1 since it needs to increment before yielding,
+ # since it can't yield then increment.
+ still_running += 1
+ # If the Fiber has been resumed before, still_running will be 2 or
+ # more. Yield different values for when the same Fiber is resumed
+ # multiple times and when it's resumed the first time in order to know
+ # if progress was made in the plan.
+ Fiber.yield(still_running < 2 ? :something_happened : :returned_immediately)
end
- publish_event(type: :stop_spin)
- @in_parallel = false
- unsubscribe(self, [:node_result])
- results
+ # Once the thread completes, return the result.
+ future.value || future.reason
end
class TimeoutError < RuntimeError; end
def wait_until_available(targets,
@@ -517,10 +496,10 @@
# Plan context doesn't make sense for most transports but it is tightly
# coupled with the orchestrator transport since the transport behaves
# differently when a plan is running. In order to limit how much this
# pollutes the transport API we only handle the orchestrator transport here.
- # Since we callt this function without resolving targets this will result
+ # Since we call this function without resolving targets this will result
# in the orchestrator transport always being initialized during plan runs.
# For now that's ok.
#
# In the future if other transports need this or if we want a plan stack
# we'll need to refactor.