lib/bolt/executor.rb in bolt-3.8.1 vs lib/bolt/executor.rb in bolt-3.9.0

- old
+ new

@@ -5,24 +5,24 @@ require 'json' require 'logging' require 'pathname' require 'set' require 'bolt/analytics' -require 'bolt/result' require 'bolt/config' -require 'bolt/result_set' +require 'bolt/fiber_executor' require 'bolt/puppetdb' +require 'bolt/result' +require 'bolt/result_set' # Load transports require 'bolt/transport/docker' require 'bolt/transport/local' require 'bolt/transport/lxd' require 'bolt/transport/orch' require 'bolt/transport/podman' require 'bolt/transport/remote' require 'bolt/transport/ssh' require 'bolt/transport/winrm' -require 'bolt/yarn' module Bolt TRANSPORTS = { docker: Bolt::Transport::Docker, local: Bolt::Transport::Local, @@ -33,11 +33,11 @@ ssh: Bolt::Transport::SSH, winrm: Bolt::Transport::WinRM }.freeze class Executor - attr_reader :noop, :transports, :in_parallel, :future + attr_reader :noop, :transports, :future attr_accessor :run_as def initialize(concurrency = 1, analytics = Bolt::Analytics::NoopClient.new, noop = false, @@ -64,21 +64,21 @@ @publisher = Concurrent::SingleThreadExecutor.new @publisher.post { Thread.current[:name] = 'event-publisher' } @noop = noop @run_as = nil - @in_parallel = false @future = future @pool = if concurrency > 0 Concurrent::ThreadPoolExecutor.new(name: 'exec', max_threads: concurrency) else Concurrent.global_immediate_executor end @logger.debug { "Started with #{concurrency} max thread(s)" } @concurrency = concurrency @warn_concurrency = modified_concurrency + @fiber_executor = Bolt::FiberExecutor.new end def transport(transport) impl = @transports[transport || 'ssh'] raise(Bolt::UnknownTransportError, transport) unless impl @@ -371,87 +371,66 @@ def run_plan(scope, plan, params) plan.call_by_name_with_scope(scope, params, true) end - def create_yarn(scope, block, object, index) - fiber = Fiber.new do - # Create the new scope - newscope = Puppet::Parser::Scope.new(scope.compiler) - local = Puppet::Parser::Scope::LocalScope.new + # Call into FiberExecutor to avoid this class getting + # overloaded while also minimizing the Puppet lookups needed from plan + # functions + # + def create_future(scope: nil, name: nil, &block) + @fiber_executor.create_future(scope: scope, name: name, &block) + end - # Compress the current scopes into a single vars hash to add to the new scope - current_scope = scope.effective_symtable(true) - until current_scope.nil? - current_scope.instance_variable_get(:@symbols)&.each_pair { |k, v| local[k] = v } - current_scope = current_scope.parent - end - newscope.push_ephemerals([local]) + def plan_complete? + @fiber_executor.plan_complete? + end - begin - result = catch(:return) do - args = { block.parameters[0][1].to_s => object } - block.closure.call_by_name_with_scope(newscope, args, true) - end + def round_robin + @fiber_executor.round_robin + end - # If we got a return from the block, get it's value - # Otherwise the result is the last line from the block - result = result.value if result.is_a?(Puppet::Pops::Evaluator::Return) + def in_parallel? + @fiber_executor.in_parallel? + end - # Validate the result is a PlanResult - unless Puppet::Pops::Types::TypeParser.singleton.parse('Boltlib::PlanResult').instance?(result) - raise Bolt::InvalidParallelResult.new(result.to_s, *Puppet::Pops::PuppetStack.top_of_stack) - end + def wait(futures, **opts) + @fiber_executor.wait(futures, **opts) + end - result - rescue Puppet::PreformattedError => e - if e.cause.is_a?(Bolt::Error) - e.cause - else - raise e - end - end - end - - Bolt::Yarn.new(fiber, index) + def plan_futures + @fiber_executor.plan_futures end - def handle_event(event) - case event[:type] - when :node_result - @thread_completed = true + # Execute a plan function concurrently. This function accepts the executor + # function to be run and the parameters to pass to it, and returns the + # result of running the executor function. + # + def run_in_thread + require 'concurrent' + require 'fiber' + future = Concurrent::Future.execute do + yield end - end - def round_robin(skein) - subscribe(self, [:node_result]) - results = Array.new(skein.length) - @in_parallel = true - publish_event(type: :stop_spin) - - until skein.empty? - @thread_completed = false - r = nil - - skein.each do |yarn| - if yarn.alive? - publish_event(type: :stop_spin) - r = yarn.resume - else - results[yarn.index] = yarn.value - skein.delete(yarn) - end - end - - next unless r == 'unfinished' - sleep(0.1) until @thread_completed || skein.empty? + # Used to track how often we resume the same executor function + still_running = 0 + # While the thread is still running + while future.incomplete? + # If the Fiber gets resumed, increment the resume tracker. This means + # the tracker starts at 1 since it needs to increment before yielding, + # since it can't yield then increment. + still_running += 1 + # If the Fiber has been resumed before, still_running will be 2 or + # more. Yield different values for when the same Fiber is resumed + # multiple times and when it's resumed the first time in order to know + # if progress was made in the plan. + Fiber.yield(still_running < 2 ? :something_happened : :returned_immediately) end - publish_event(type: :stop_spin) - @in_parallel = false - unsubscribe(self, [:node_result]) - results + # Once the thread completes, return the result. + future.value || future.reason end class TimeoutError < RuntimeError; end def wait_until_available(targets, @@ -517,10 +496,10 @@ # Plan context doesn't make sense for most transports but it is tightly # coupled with the orchestrator transport since the transport behaves # differently when a plan is running. In order to limit how much this # pollutes the transport API we only handle the orchestrator transport here. - # Since we callt this function without resolving targets this will result + # Since we call this function without resolving targets this will result # in the orchestrator transport always being initialized during plan runs. # For now that's ok. # # In the future if other transports need this or if we want a plan stack # we'll need to refactor.